Realtime Market-Data Updates with WebSocket and Elixir

Cryptocurrency exchanges usually open their realtime feed for free and, like Coinbase Pro, without even having to create an account. This gives us a great way to build an architecture around realtime market data.
In this article we see how to build an Elixir application to get realtime updates from the coinbase websocket feed, handling crashes and disconnections.

Coinbase Elixir Application

Realtime Crypto Market Data

Coinbase is one of the most known and used exchange for trading crypto currencies. As many other crypto exchanges, they provide a realtime feed to download all the different events from the exchange. If we register to the service, we see a trading view like this below

Coinbase Pro trading interface – Trade History

We mainly focus on the stream of new trades at the bottom-right, in the Trade History column, where it shows the trades of the last few minutes in descending order. We see the last trade price on the top-bar  is $3356.01, which reflects the trade at the first row happened at 12:36:30. The data we want to download, at the moment, is just the stream of trades.
Let’s take a look at the documentation of the websocket-feed.

We need first to connect to the websocket service, using the  wss://ws-feed.pro.coinbase.com URI.
Once connected we just need to send a subscription JSON message like this.

{
    "type": "subscribe",
    "product_ids": [
        "BTC-USD"
    ],
    "channels": [
        "matches"
    ]
}

There are many channels: level2, ticker, heartbeat, matches etc..  We focus just on the matches channel, which represents the stream of trades. In this case we subscribe for the “BTC-USD” product, but we could subscribe to many different products at the same time.

WebSocket Connection

Let’s create a new Elixir application called coinbase, with --sup option which creates an OTP application skeleton for us, including a supervision tree. You can find the full code at this GitHub repo.

$ mix new coinbase --sup

We need now a websocket client. I’ve used WebSockex in the last few months which,  as of  now, it’s actively maintained. Let’s add it to our dependencies in the mix.exs file, along with the Jason JSON library. We will need Jason to encode and decode JSON strings.

#coinbase/mix.ex
def deps do
[
  {:websockex, "~> 0.4.2"},
  {:jason, "~> 1.1"},
]
end

And run the deps.get command to download the dependencies.

$ mix deps.get
Resolving Hex dependencies...
Dependency resolution completed:
Unchanged:
  jason 1.1
  websockex 0.4.2
* Getting websockex (Hex package)
* Getting jason (Hex package)

Great, we can now start building our Coinbase client using Websockex.
Let’s focus, at first, on the client’s connection

defmodule Coinbase.Client do
  use WebSockex

  @url "wss://ws-feed.pro.coinbase.com"

  def start_link(product_ids \\ []) do
    WebSockex.start_link(@url, __MODULE__, :no_state)
  end
  
  def handle_connect(conn, state) do
    IO.puts "Connected!"
    {:ok, state}
  end
end

We’ve created a Coinbase.Client module that simply connects to the Coinbase server.

  • line 2: we’ve used use WebSockex to inject WebSockex functionalities in our module
  • line 7: WebSockex.start_link/3 starts a websocket client in a seperate process and returns {:ok, pid} tuple.
  • line 10: the handle_connect/2 callback is called when the client is connected.

Let’s run it in the Elixir’s interactive shell:

$ iex -S mix

iex> {:ok, _pid} = Coinbase.Client.start_link []
Connected!
{:ok, #PID<0.393.0>}

Subscription Frame

Our module connects correctly to the Coinbase websocket server. To be able to receive the trades through the websocket connection, we need to subscribe to the matches channel.

Subscription Message
def subscribtion_frame(products) do

  subscription_msg = %{
  
    type: "subscribe",
    product_ids: products,
    channels: ["matches"]
  
  } |> Jason.encode!()

    {:text, subscription_msg}
end

The  subscription_frame/1  function returns a frame ready to be sent to the server. A frame is a tuple {type, data}, where in our case type is :text and data is a JSON string.
The products argument is a list of Coinbase product ids, which are strings like "BTC-USD", "ETH-USD", "LTC-USD".
We build the subscription message using a Map with type, product_ids and channels keys. This map is then converted to a JSON string using the Jason.encode!/1 function. The returned frame looks like this

{:text, "{\"type\":\"subscribe\",\"product_ids\":[\"BTC-USD\"],\"channels\":[\"matches\"]}" }

We then add a subscribe function to our module, which we will use once connected.

def subscribe(pid, products) do
  WebSockex.send_frame pid, subscribtion_frame(products)
end

The subscribe/2  function builds the subscription frame {:text, json_msg} using the subscription_frame(products) we’ve just seen and sends the frame to the server using the WebSockex.send_frame(pid, frame) function, where the pid is the websocket client process id.

Handling WebSocket Frames

Once subscribed, the Coinbase server starts sending us messages in JSON format. We need now to implement the callback handle_frame which is called when the client receives data from the server.

def handle_frame(_frame={:text, msg}, state) do
    msg
    |> Jason.decode!()
    |> IO.inspect()
    {:ok, state}
end

We’ve just implemented a simple handle_frame(frame, state) function where we

  • pattern match the frame, extracting the JSON message string
  • decode it using Jason.decode!/1, which converts the JSON string to a Map
  • print the map to the standard output

The callback returns  a {:ok, new_state} tuple, useful if we need to update our state (like handle_cast/2 in GenServer).

We are ready to test our client on iex and manually connect and subscribe, hoping to see some trades flowing in.

iex> products = ["BTC-USD"]
iex> {:ok, pid} = Coinbase.Client.start_link products
connected!
{:ok, #PID<0.200.0>}

iex> Coinbase.Client.subscribe pid, products

%{
  ....
  "price" => "3562.91000000", "product_id" => "BTC-USD",
  "side" => "sell", "size" => "0.01341129",
  "time" => "2018-12-18T14:43:13.254000Z","trade_id" => 56138234,
  "type" => "last_match",   
}
%{
  "channels" => [%{"name" => "matches", "product_ids" => ["BTC-USD"]}],
  "type" => "subscriptions"
}
%{
  "maker_order_id" => "......",
  "price" => "3562.91000000",
  "product_id" => "BTC-USD",
  "sequence" => .....,
  "side" => "sell",
  "size" => "0.13762144",
  "taker_order_id" => ".....",
  "time" => "2018-12-18T14:43:15.177000Z",
  "trade_id" => 56138235,
  "type" => "match"
}
...

It’s working! We are receiving data from the server.

The first message is always a "type" => "last_match", which is necessary when we have a recovery process that downloads the missed trades after a disconnection (beyond the scope of this article). There is also a confirmation of our subscription "type" => "subscriptions". But what we are really interested about are the "type" => "match" messages, which are the live trades.

To now make the subscription automatic, we need to change the Coinbase.Client.start_link function

def start_link(products \\ []) do
  {:ok, pid} = WebSockex.start_link(@url, __MODULE__, :no_state)
  subscribe(pid, products)
  {:ok, pid}
end

In this way we start the WebSockex process with WebSockex.start_link/3, which connects to the server and returns the pid. We then use this pid to subscribe, calling the subscription function implemented before, and then let the function to return the {:ok, pid} tuple.

Filtering the trades is really easy with pattern matching. We can’t pattern match the message directly in the handle_frame/2 function, since msg is a JSON string and we need first to convert it into a Map, using Jason.decode!/1. We define then a new function handle_msg/2 .

def handle_frame({:text, msg}, state) do
  handle_msg(Jason.decode!(msg), state)
end

def handle_msg(%{"type" => "match"}=trade, state) do
  IO.inspect(trade)
  {:ok, state}
end

def handle_msg(_, state), do: {:ok, state}

The first handle_msg/2 function imposes that the "type" => "match"  and is called only when this condition is true.
The second handle_msg(_,state) works as a catch all, which ignore the messages returning {:ok, state}.

This works like a filter and it’s exactly the same as using case

def handle_msg(msg, state) do
  case msg do
    %{"type" => "match"}=trade -> IO.inspect(trade)
    _ -> :ignore
  end
  {:ok, state}
end

Let’s go back to iex and see now what happens just starting our Coinbase Client

iex> Coinbase.Client.start_link ["BTC-USD"]
connected!
{:ok, #PID<0.184.0>}
%{
  "maker_order_id" => "9050ea18-440f-442e-9129-358d77351685",
  "price" => "3531.42000000",
  "product_id" => "BTC-USD",
  "sequence" => 7584073701,
  "side" => "sell",
  "size" => "0.04000084",
  "taker_order_id" => "e5df8af2-20d0-4592-af98-e41bbb2ed8a9",
  "time" => "2018-12-18T15:20:01.237000Z",
  "trade_id" => 56140459,
  "type" => "match"
}
Coinbase VS Terminal

Great, it works!

You can find the full code at this link: client.ex

Elixir Supervision

We can monitor disconnections implementing the handle_disconnect(conn, state) callback

def handle_disconnect(_conn, state) do
  IO.puts "disconnected"
  {:reconnect, state}
end

Returning {:reconnect, state} we ask to WebSockex to reconnect using the same process. Unfortunately this solution isn’t the best in our case because we have the subscription process inside the Coinbase.Client.start_link function, which is not called when the connection is restarted.
We opt then to let the process exit, returning :ok instead of :reconnect.

def handle_disconnect(_conn, state) do
  IO.puts "disconnected"
  {:ok, state}
end

It’s easy to test a disconnection. Just switching off the wifi (or disconnecting the ethernet cable), inducing the client to fire a timeout error.

iex> Coinbase.Client.start_link ["BTC-USD"]
...
%{"type" => "match", ...}
%{"type" => "match", ...}
...
# switch off the connection and wait

15:33:51.839 [error] [83, 83, 76, 58, 32, 83, 111, 99, 107, 101, 116, 32, 101, 114, 114, 111, 114, 58, 32, 'etimedout', 32, 10]
disconnected
** (EXIT from #PID<0.182.0>) shell process exited with reason: {:remote, :closed}
iex>

Since we’ve used the WebSockex.start_link function, the websockex process was linked to our iex process which is taken down after the error.

To cope with client’s disconnections and crashes, we need to add some basic supervision. When we created the project, we’ve passed the --sup option to the mix command. This created a supervision skeleton we can now use.
Let’s open the application.ex file and add the processes we want to monitor.

defmodule Coinbase.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Coinbase.Client, ["BTC-USD"]}
    ]

    opts = [strategy: :one_for_one, name: Coinbase.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

At line 6, we’ve added the  {Coinbase.Client, products}tuple. The Supervisor will then start, running and monitoring our process, started with Coinbase.Client.start_link ["BTC-USD"]. In the case of a disconnection (or a crash) it will start a new client.

Supervisor starts and monitors a Client

Running the Application

$ mix run --no-halt

connected!
%{
  ...
  "price" => "3521.20000000",
  ...
  "type" => "match"
}
$ iex -S mix
...
connected!
Interactive Elixir (1.7.3) - press Ctrl+C to exit (type h() ENTER for help)
iex> %{
  ...
  "price" => "3522.85000000",
  ...
  "type" => "match"
}

In both cases the application starts, running the Coinbase.Supervisor which, in turn, starts and monitors the Coinbase.Client.

What happens if we kill the Coinbase.Client process? We can get the pid from the list of processes monitored by the Coinbase.Supervisor and kill it using the Process.exit(pid, reason) function.

[{Coinbase.Client, pid, _, _}]= Supervisor.which_children(Coinbase.Supervisor)
Process.exit(pid, :kill)

Let’s see it in action

$ iex -S mix
...
connected!
...
%{ ..., "price" => "3522.85000000", "type" => "match"}
...


iex> [{Coinbase.Client, pid, _, _}]= Supervisor.which_children(Coinbase.Supervisor)
Process.exit(pid, :kill)

true
iex(6)> connected!
%{ ..., "price" => "3535.00000000", "type" => "match"}

We see that after we kill the client process, the supervisor starts a new client that connects immediately to the Coinbase server.

Wrap Up

This is initial and simple implementation of a supervised Coinbase client we can use to start processing the trades’ stream.
We’ve implemented a basic supervision, which is great if you don’t mind to loose some trades after the disconnections, but is not enough if you want a proper fault-tolerant client, that’s able to recover lost data. After a disconnection occurs -and before the client is reconnected and ready to receive trades again- there could have been other trades. In that case we would have lost them.

Problem of missing trades during a reconnection

A solution, to properly recover the lost trades, is to check the id of the last trade and then download the lost trades using a different API,  like the Get Trades in Coinbase.