Challenge #3a: Single-Node Broadcast

This third challenge is where things start to get interesting.

The first part of this challenge is called "Single-Node Broadcast". It sets us up for the three RPC operations.

  1. broadcast – This tells a node to broadcast a value to all nodes in the cluster. The value is an integer and is unique for each message from maelstrom. The node should store the value so that it can be read later.
  2. read – This tells a node to return all values it has seen.
  3. topology – This informs a node of its neighbouring nodes.

As usual, let's lay the groundwork.

$ go mod init github.com/nchengyeeshen/broadcast
$ touch main.go
package main

import (
    "log"

    maelstrom "github.com/jepsen-io/maelstrom/demo/go"
)

func main() {
    n := maelstrom.NewNode()

    if err := n.Run(); err != nil {
        log.Fatal(err)
    }
}

To keep the main function simple, let's introduce a new Server struct to keep all the handlers grouped together.

type Server struct {
    Node *maelstrom.Node
}

func NewServer(node *maelstrom.Node) *Server {
    s := &Server{
        Node: node,
    }

    // Register the handlers
    node.Handle("broadcast", s.BroadcastHandler)
    node.Handle("read", s.ReadHandler)
    node.Handle("topology", s.TopologyHandler)

    return s
}

func (s *Server) BroadcastHandler(req maelstrom.Message) error { return nil }
func (s *Server) ReadHandler(req maelstrom.Message) error      { return nil }
func (s *Server) TopologyHandler(req maelstrom.Message) error  { return nil }

The three RPC handlers are:

  1. Server.BroadcastHandler
  2. Server.ReadHandler
  3. Server.TopologyHandler

NewServer creates a new Server instance and registers its handler methods with the incoming node.

Let's use Server in the main function.

func main() {
    srv := NewServer(maelstrom.NewNode())

    if err := srv.Run(); err != nil {
        log.Fatal(err)
    }
}

type Server struct {
    Node *maelstrom.Node
}

func (s *Server) Run() error {  // New!
    return s.Node.Run()
}

Note that I've added a Server.Run method to keep the node nicely encapsulated.

Let's implement the topology handler.

A topology request takes the following form:

{
  "type": "topology",
  "topology": {
    "n1": ["n2", "n3"],
    "n2": ["n1"],
    "n3": ["n1"]
  }
}

The topology object can be modeled via map[string][]string. Let's add that into our Server.

type Server struct {
    Node     *maelstrom.Node
    Topology map[string][]string // New!
}

func (s *Server) TopologyHandler(req maelstrom.Message) error {
    var body struct {
        Topology map[string][]string `json:"topology"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    s.Topology = body.Topology

    return s.Node.Reply(req, struct {
        Type string `json:"type"`
    }{
        Type: "topology_ok",
    })
}

Nothing fancy going on here. We're unmarshalling the request, assigning it to Server.Topology and then replying with a topology_ok.

Next, let's implement the broadcast handler. A complete broadcast handler performs three things:

  1. Store the incoming value.
  2. Broadcast the incoming value. – This isn't important for this part of the challenge since there is only one node.
  3. Reply with broadcast_ok.

A broadcast request takes the following form:

{
  "type": "broadcast",
  "message": 1000
}

Let's implement storing the incoming value & replying to this request.

type Server struct {
    Node     *maelstrom.Node
    Topology map[string][]string
    Values   []int // New!
}

func (s *Server) BroadcastHandler(req maelstrom.Message) error {
    var body struct {
        Message int `json:"message"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    s.Values = append(s.Values, body.Message)

    return s.Node.Reply(req, struct {
        Type string `json:"type"`
    }{
        Type: "broadcast_ok",
    })
}

Last, but not least, the read handler.

A read request takes the following form:

{
  "type": "read"
}

The type field already helps us invoke the read handler, so there are no other fields to worry about.

We have to pay attention to the reply to this message, which has to be in the following form:

{
  "type": "read_ok",
  "messages": [1, 8, 72, 25]
}

Onward!

func (s *Server) ReadHandler(req maelstrom.Message) error {
    return s.Node.Reply(req, struct {
        Type     string `json:"type"`
        Messages []int  `json:"messages"`
    }{
        Type:     "read_ok",
        Messages: s.Values,
    })
}

Huh. That was pretty easy.

We have everything in place to start the maelstrom tests.

go build -o broadcast . && maelstrom test -w broadcast --bin ./broadcast --node-count 1 --time-limit 20 --rate 10
Everything looks good! ヽ(‘ー`)ノ

That covers it for part 1 of this third challenge!


The following is the complete code for main.go.

package main

import (
    "encoding/json"
    "log"

    maelstrom "github.com/jepsen-io/maelstrom/demo/go"
)

func main() {
    srv := NewServer(maelstrom.NewNode())

    if err := srv.Run(); err != nil {
        log.Fatal(err)
    }
}

type Server struct {
    Node     *maelstrom.Node
    Topology map[string][]string
    Values   []int
}

func (s *Server) Run() error {
    return s.Node.Run()
}

func NewServer(node *maelstrom.Node) *Server {
    s := &Server{
        Node: node,
    }

    node.Handle("broadcast", s.BroadcastHandler)
    node.Handle("read", s.ReadHandler)
    node.Handle("topology", s.TopologyHandler)

    return s
}

func (s *Server) BroadcastHandler(req maelstrom.Message) error {
    var body struct {
        Message int `json:"message"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    s.Values = append(s.Values, body.Message)

    return s.Node.Reply(req, struct {
        Type string `json:"type"`
    }{
        Type: "broadcast_ok",
    })
}

func (s *Server) ReadHandler(req maelstrom.Message) error {
    return s.Node.Reply(req, struct {
        Type     string `json:"type"`
        Messages []int  `json:"messages"`
    }{
        Type:     "read_ok",
        Messages: s.Values,
    })
}

func (s *Server) TopologyHandler(req maelstrom.Message) error {
    var body struct {
        Topology map[string][]string `json:"topology"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    s.Topology = body.Topology

    return s.Node.Reply(req, struct {
        Type string `json:"type"`
    }{
        Type: "topology_ok",
    })
}