Creating a Blockchain: Part 7 - RPC Protocol

Creating a Blockchain: Part 7 - RPC Protocol

Understandings of RPC

RPC stands for Remote Procedure Call, and it is a protocol that allows one program to request a service or function from another program located on a remote server. In the context of blockchain, RPC is commonly used for communication between different components of a blockchain system.

Here are some of the most common types of RPC connections in blockchain and their practical uses with examples:

  1. JSON-RPC:

    • Use: JSON-RPC is a lightweight RPC protocol that uses JSON (JavaScript Object Notation) as the data format for message serialization. It is commonly used in blockchain networks for communication between nodes and external clients.

    • Practical Example: Ethereum's JSON-RPC is widely used for interacting with Ethereum nodes. Clients can use JSON-RPC requests to query blockchain data, send transactions, and interact with smart contracts. For instance, a client can use JSON-RPC to request the balance of an Ethereum address.

  2. HTTP-RPC:

    • Use: HTTP-RPC is a generic term referring to RPC protocols that use HTTP as the transport layer. It is commonly used in blockchain systems where simplicity and compatibility with web technologies are priorities.

    • Practical Example: Bitcoin's HTTP-RPC, also known as Bitcoin RPC, allows clients to interact with a Bitcoin node using HTTP requests. Clients can submit transactions, query blockchain information, and perform other operations using standard HTTP methods.

  3. WebSocket-RPC:

    • Use: WebSocket-RPC enables real-time, bidirectional communication between a client and a server over a single, long-lived connection. It is used in blockchain for scenarios requiring continuous updates and notifications.

    • Practical Example: A blockchain explorer may use WebSocket-RPC to receive real-time updates on new blocks and transactions. This allows the explorer to display the latest blockchain information to users as it happens.

It's important to note that the choice of RPC connection type depends on the specific requirements and design considerations of the blockchain network. Each type has its advantages and may be preferred based on factors such as efficiency, ease of implementation, and compatibility with existing infrastructure.

So, we are creating standards of RPC interface and implement it in this blog...


RPC structure

  1. RPC struct:

    • Represents a basic structure for Remote Procedure Call (RPC) containing information about the sender (From) and the payload (Payload).
  2. MessageType enum:

    • Defines constants for different message types, specifically for transactions (MessageTypeTx) and blocks (MessageTypeBlock).
  3. Message struct:

    • Represents a message with a header indicating its type and data. It is used to create messages with specific types.
  4. NewMessage function:

    • A constructor function for creating a new Message with a given type and data.
  5. Bytes method (on Message):

    • Converts the Message into a byte slice for sending over the network.
  6. RPCHandler interface:

    • Defines an interface for handling RPC messages, primarily focusing on the HandleRPC method.
  7. DecodedMessage struct:

    • Represents a decoded message with the sender's address (From) and the actual data (Data).
  8. RPCDecodeFunc type:

    • Represents a function type for decoding an RPC message and returning a DecodedMessage.
  9. DefaultRPCDecodeFunc function:

    • A default implementation of the RPCDecodeFunc type, which decodes a message using the gob decoder. It supports decoding transaction messages.
  10. RPCProcessor interface:

    • Defines an interface for processing decoded RPC messages, primarily focusing on the ProcessMessage method.

network/rpc.go

package network

import (
    "ProjectX/core"
    "bytes"
    "encoding/gob"
    "fmt"
    "io"

    "github.com/sirupsen/logrus"
)

type RPC struct {
    From    NetAddr
    Payload io.Reader
}

type MessageType byte

const (
    MessageTypeTx  MessageType = 0x1
    MessageTypeBlock MessageType = 0x2
)

type Message struct {
    Header MessageType
    Data []byte
}

func NewMessage(t MessageType, data []byte) *Message {
    return &Message{Header: t, Data: data}
}

func (msg *Message) Bytes() []byte {
    buf := &bytes.Buffer{}
    gob.NewEncoder(buf).Encode(msg)
    return buf.Bytes()
}

type RPCHandler interface{
    HandleRPC(rpc RPC) error
}

type DecodedMessage struct {
    From NetAddr
    Data any
}

type RPCDecodeFunc func(rpc RPC) (*DecodedMessage, error)


func DefaultRPCDecodeFunc(rpc RPC) (*DecodedMessage, error){
        msg := &Message{}

        dec := gob.NewDecoder(rpc.Payload)
        err := dec.Decode(&msg)
        if err != nil {
            return nil, fmt.Errorf("failed to decode message from %s: %s", rpc.From, err)
        }

        logrus.WithFields(logrus.Fields{
            "from": rpc.From,
            "type": msg.Header,
        }).Debug("New incoming message")

        switch msg.Header{
            case MessageTypeTx:
                tx := new(core.Transaction)
                if err := tx.Decode(core.NewGobTxDecoder(bytes.NewReader(msg.Data))); err != nil {
                    return nil,err 
                }
            return &DecodedMessage{
                From :rpc.From,
                Data : tx,
            }, nil
        default:
            return nil,fmt.Errorf("invalid message header %x", msg.Header)
    }

}

type RPCProcessor interface{
    ProcessMessage(msg *DecodedMessage) error
}

Summary of Understandings:

  • The code defines structures and functions related to handling RPC messages in a blockchain network.

  • RPC struct encapsulates the sender and payload of an RPC message.

  • MessageType enum specifies different types of messages (e.g., transactions or blocks).

  • Message struct represents a message with a type header and data payload.

  • RPCHandler and RPCProcessor interfaces provide blueprints for handling and processing RPC messages.

  • RPCDecodeFunc is a function type for decoding RPC messages, and DefaultRPCDecodeFunc is a default implementation.

  • The code demonstrates a modular approach to handling and decoding RPC messages, with a focus on supporting transactions in the default implementation.


Now, we use this RPC structure to implement it in server,

  1. ServerOpts struct:
  • Represents options for configuring the Server, including properties such as RPCDecodeFunc and RPCProcessor.
  1. NewServer function:

    • Constructor function for creating a new Server instance with the specified options.

    • Sets default values for BlockTime, RPCDecodeFunc, and RPCProcessor if not provided.

  2. Start method (on Server):

    • Initializes transports and starts the server loop.

    • Listens for messages on the rpcChan channel, decodes and processes RPC messages, and handles other cases.

    • Uses a ticker to trigger certain actions based on the block time.

  3. ProcessMessage method (on Server):

    • Implements the RPCProcessor interface for processing decoded messages.

    • Switches on the type of the decoded message's data and calls the corresponding processing function (e.g., processTransaction for transactions).

  4. processTransaction method (on Server):

    • Processes a transaction by verifying it, checking if it's already in the mempool, setting the first seen timestamp, logging information, broadcasting the transaction, and adding it to the mempool.

    • Runs concurrently (using go) to handle these tasks without blocking the main execution.

network/server.go

type ServerOpts struct {
    //..other properties
    RPCDecodeFunc RPCDecodeFunc
    RPCProcessor RPCProcessor
}

func NewServer(opts ServerOpts) *Server {
//other code
    if opts.BlockTime == time.Duration(0) {
        opts.BlockTime = DefaultBlockTime
    }

    if opts.RPCDecodeFunc == nil {
        opts.RPCDecodeFunc = DefaultRPCDecodeFunc
    }

    if s.RPCProcessor == nil {
        s.RPCProcessor = s
    }
    return s
}

func (s *Server) Start() {
    s.initTransports()
    ticker := time.NewTicker(s.BlockTime)

free:
    for {
        select {
        case rpc := <-s.rpcChan:
            decodedMessage, err := s.RPCDecodeFunc(rpc)
            if err != nil {
                logrus.Error(err)
            }
            if err := s.RPCProcessor.ProcessMessage(decodedMessage); err!=nil {
                logrus.Error(err)
            }
        // other cases
    }

    fmt.Println("Server shut down")
}

func (s *Server) ProcessMessage (decodedMsg *DecodedMessage) error{

    switch t := decodedMsg.Data.(type) {
        case *core.Transaction:
            return s.processTransaction(t)
    }
    return nil
}

func (s *Server) processTransaction(tx *core.Transaction) error {

    hash := tx.Hash(core.TxHasher{})

    if s.memPool.Has(hash) {
        logrus.WithFields(
            logrus.Fields{
                "hash": hash,
            }).Info("transaction already in mempool")
    }

    if err := tx.Verify(); err != nil {
        return err
    }

    tx.SetFirstSeen(time.Now().UnixNano())

    logrus.WithFields(logrus.Fields{
        "hash": hash,
        "mempool length": s.memPool.Len(),
    }).Info("adding new tx to the mempool")

    go s.broadcastTx(tx)
    //add tx to other peers

    return s.memPool.Add(tx)
}

Summary of Understandings:

  • The ServerOpts struct contains options for configuring the Server, including functions for decoding RPC messages (RPCDecodeFunc) and processing messages (RPCProcessor).

  • The NewServer function constructs a new Server instance with default values if certain options are not provided.

  • The Start method initiates server processes, including handling RPC messages, using a ticker for periodic tasks.

  • The ProcessMessage method implements the RPCProcessor interface, dispatching specific processing tasks based on the type of the decoded message.

  • The processTransaction method handles the processing of transactions, including verification, mempool checks, timestamp setting, logging, broadcasting, and mempool addition.

We did broadcastTx but we have not implement it, we'll do it next. But as of now we are checking our functionality without broadcasting it.


Testing

We are sending transaction as RPC message,

The sendTransaction function generates a new random transaction, signs it with a freshly generated private key, encodes the transaction using the gob encoder, and then sends the encoded transaction as a message to a specified network address using a given transport (network.Transport). The function is designed to simulate sending transactions over a network in a blockchain system.

network/main.go

package main

import (
    "ProjectX/core"
    "ProjectX/crypto"
    "ProjectX/network"
    "bytes"
    "math/rand"
    "strconv"
    "time"

    "github.com/sirupsen/logrus"
)

func main() {
    trLocal := network.NewLocalTransport("LOCAL")
    trRemote := network.NewLocalTransport("REMOTE")

    trLocal.Connect(trRemote)
    trRemote.Connect(trLocal)

    go func() {
        for {
            if err := sendTransaction(trRemote, trLocal.Addr()); err != nil {
                logrus.Error(err)
            }
            time.Sleep(1 * time.Second)
        }
    }()

    Pk := crypto.GeneratePrivateKey()
    opts := network.ServerOpts{
        Transports: []network.Transport{trLocal},
        PrivateKey : &Pk,
    }

    s := network.NewServer(opts)

    s.Start()
}

func sendTransaction(tr network.Transport, to network.NetAddr) error {
    Pk := crypto.GeneratePrivateKey()
    tx := core.NewTransaction([]byte(strconv.FormatInt(int64(rand.Intn(100)),10)))
    tx.Sign(Pk)

    txBytes := &bytes.Buffer{}
    err := tx.Encode(core.NewGobTxEncoder(txBytes))
    if err != nil {
        return err
    }

    msg := network.NewMessage(network.MessageTypeTx, txBytes.Bytes())

    return tr.SendMessage(to, msg.Bytes())
}

let's run,

make run

Running fine!

Here's how it's happening..!!

This chart gives clear representation that how it's working in big picture.


Broadcast Transaction

Broadcasting transaction is one of the important responsibility of server. Generally, when server receives Transaction, It verifies it and broadcast it to it's peers. We are going to broadcast anything in bytes to make it more modular that even if we have to broadcast anything other than message or transaction in future, we can do it by just encoding it and passing it to broadcast method.

Let's implement it.

As it is responsibility of transport layer to broadcast message, We added Broadcast([]byte) error function which means Every transport have to have broadcast feature.

network/transport.go

type Transport interface {
    // other
    Broadcast([]byte) error
}

We added Broadcast method definition for broadcast bytes.

network/local_transport.go

func (t *LocalTransport) Broadcast(payload []byte) error {
    for _, p := range t.peers{
        err := t.SendMessage(p.Addr(), payload)
        if err != nil {
            return err
        }
    }
    return nil
}

Testing

  • Setup:

    • Creates three instances of a local transport (tra, trb, trc), each identified by a unique label ("A," "B," "C").

    • Connects transport tra to transports trb and trc.

  • Broadcast:

    • Calls the Broadcast method on transport tra, simulating the broadcast of a message (in this case, the message is a byte slice with the content "hello world").
  • Verification:

    • Listens for incoming messages on transports trb and trc using the Consume method.

    • Verifies that both trb and trc receive the same message that was broadcasted by tra.

    • Compares the received message content with the expected "hello world."

  • Assertions:

    • checking for a successful connection between transports, successful broadcast, and the correctness of the received messages

network/local_transport_test.go

func TestBroadcast(t *testing.T) {
    tra := NewLocalTransport("A").(*LocalTransport)
    trb := NewLocalTransport("B").(*LocalTransport)
    trc := NewLocalTransport("C").(*LocalTransport)

    assert.Nil(t, tra.Connect(trb))
    assert.Nil(t, tra.Connect(trc))

    message := []byte("hello world")

    assert.Nil(t, tra.Broadcast(message))

    rpcb := <-trb.Consume()
    b, err := io.ReadAll(rpcb.Payload)
    assert.Nil(t, err)
    assert.Equal(t, b, message)

    rpcc := <-trc.Consume()
    b, err = io.ReadAll(rpcc.Payload)
    assert.Nil(t, err)
    assert.Equal(t, b, message)
}
make test

YEAH, IT'S WORKING!!

The following blog post will explore the code related to Block creation


In this blog series, I'll be sharing code snippets related to blockchain architecture. While the code will be available on my GitHub, I want to highlight that the entire architecture isn't solely my own. I'm learning as I go, drawing inspiration and knowledge from various sources, including a helpful YouTube playlist that has contributed to my learning process.

Did you find this article valuable?

Support Siddharth Patel by becoming a sponsor. Any amount is appreciated!