Aggregate and Count Real-Time Trades with Elixir Maps

In the Real-time Market-Data Updates with Elixir article, we have seen how to build a Coinbase WebSocket client and receive real-time trades. In the comments of the article, a reader asked how many trades we receive per second. In general, the rate of trades depends on the product and the time of day.

In this article we start to see how to aggregate these trades in real-time, using the GenServer behaviour. We’ll build an aggregation process that groups and count the  trades.

Coinbase.Client

Let’s download the code we wrote in the previous article. You can find it at this GitHub repository, poeticoding/learning_coinbase.

$ git clone https://github.com/poeticoding/learning_coinbase

git clone git@github.com:poeticoding/learning_coinbase.git
Cloning into 'learning_coinbase'...
....

$ cd learning_coinbase && git checkout 45f34e5

To start with the same code we ended with in the previous article, we do a checkout to the part1 release (commit 45f34e5).

Inspecting the module Coinbase.Client ( client.ex ), we see that everytime we receive a message, we decode it to a Map and then we print it only if the "type" is "match", which means it’s a trade.

def handle_frame({:text, msg}, state) do
  handle_msg(Jason.decode!(msg), state)
end

def handle_msg(%{"type" => "match"} = trade, state) do
  IO.inspect(trade)
  {:ok, state}
end

Now, instead of start writing our aggregation code in the Coinbase.Client module, it’s better to decouple the aggregation code and create a new module for it. The aggregation module we are going to implement will receive the trades from the client, using the function new_trade(trade).

Aggregation – Counting the Trades

We can receive multiple trades within the same second. We see in the image below that the trade 1,2 and 3 are in the same second and trade 4 and 5 in another second.

Grouping Trades

Looking at a trade example, we see that we have a time string we can use to group the trades.

%{
  "maker_order_id" => "ab678037-b931-4896-8310-8e2a91efc3aa",
  "price" => "3588.00000000",
  "product_id" => "BTC-USD",
  "sequence" => 7675147771,
  "side" => "sell",
  "size" => "0.05333699",
  "taker_order_id" => "df54a7ae-5fc5-4bc4-8db8-635e632c8597",
  "time" => "2018-12-28T00:02:23.809000Z",
  "trade_id" => 56786993,
  "type" => "match"
}

To convert the "time" to a DateTime struct we use the function DateTime.from_iso8601/2

DateTime.from_iso8601("2018-12-28T00:02:23.809000Z")
{:ok, #DateTime<2018-12-28 00:02:23.809000Z>, 0}

The Elixir DateTime struct along with a Map are useful to easily group the trades made in the same second. With Elixir DateTime we can transform the time string to a date-time tuple with seconds precision, leaving out the milliseconds.

{:ok, dt, _} = DateTime.from_iso8601("2018-12-28T00:02:23.809000Z")
key = {dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second}
{2018, 12, 28, 0, 2, 23}

This tuple will then be used as a key of the grouping map we’ll see later.

Aggregation Module

Let’s start to implement our aggregation module.

defmodule Coinbase.Aggregation do
  use GenServer

  def start_link([]) do
    GenServer.start_link(__MODULE__, :ok, name: Coinbase.Aggregation)
  end

  def init(:ok), do: {:ok, %{}}

end

We implement a GenServer behaviour and, for simplicity, we start the process with the name Coinbase.Aggregation. Since we don’t need to pass any parameter to the init function ,  we pass just an :ok. The init(:ok) function sets the state, which is an empty Map.

Our goal is to count the number of trades per second, updating the count for each trade we receive, creating an Elixir Map like this

%{
  {2018, 12, 28, 0, 2, 23} => 2,
  {2018, 12, 28, 0, 2, 24} => 1,
  {2018, 12, 28, 0, 2, 25} => 5,
  ...
}

Let’s implement the handle_cast/2 function. We want to receive asynchronously the trade messages from the client and we don’t need to return any result to the client.

# Coinbase.Aggregation - lib/coinbase/aggregation.ex
def handle_cast({:new_trade, %{"time" => time}}, %{}=counts) do
  {:ok, dt, _} = DateTime.from_iso8601(time)
  key = {dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second}
  updated_counts = Map.update(counts, key, 1, fn v-> v+1 end)
  {:noreply, updated_counts}
end
  • line 16: we implement the handle_cast/2 function that pattern matches the message {:new_trade, %{"time" => time}}. In this way, we extract the value time, ready to be used inside the function. counts is our state, where we aggregate the number of trades.
  • line 17: we convert the time string to a DateTime struct dt, which we use to build, in line 18, a key that represents the date-time (with seconds precision) of that trade.
  • line 19: then we update the counter inside the map for that specific key. The function we use Map.update(map, key, initial, fun) accepts an initial parameter which is the initial value in case the key is not present in the map. When the map has already a key with a count for that specific second, the update/4 function updates the value increasing it by 1.
  • line 20: we then update the state returning the updated_counts Map.

Interface

The Coinbase.Client could use directly the GenServer.cast/2 function to send messages to the Coinbase.Aggregation process. In general is much better to write a public interface in the Coinbase.Aggregation module, made by functions which deal with the messaging.

# Coinbase.Aggregation - lib/coinbase/aggregation.ex
def new_trade(pid, trade),
  do: GenServer.cast(pid, {:new_trade, trade})

def new_trade(trade),
  do: new_trade(Coinbase.Aggregation, trade)

The new_trade(pid, trade) sends the message {:new_trade, trade} to the process pid. Since we start the process with the fixed name Coinbase.Aggregation, we add also a function new_trade/1 that uses the process name as a pid.

The Coinbase.Client module can now use the Coinbase.Aggregation.new_trade/1 function to send the trade to the aggregation process.

# Coinbase.Client - lib/coinbase/client.ex
def handle_msg(%{"type" => "match"} = trade, state) do
  Coinbase.Aggregation.new_trade(trade)
  {:ok, state}
end

You find the aggregation and client full code here: aggregation.ex and client.ex

Supervision

In the previous article we’ve already setup a simple supervisor, which starts and supervises Coinbase.Client.  We need now to start the Coinbase.Aggregation process before we start the client. To do so, we just need to add our module as the first element of the children list

# Coinbase.Application - lib/coinbase/application.ex
def start(_type, _args) do
  children = [
    {Coinbase.Aggregation, []},
    {Coinbase.Client, ["BTC-USD"]}
  ]

  opts = [strategy: :one_for_one, name: Coinbase.Supervisor]
  Supervisor.start_link(children, opts)
end

Full code: application.ex

Inspecting the Aggregation state

It’s now time to run our application on iex.

$ iex -S mix
...
connected!
iex(1)>

The interactive shell starts and the only message we see is “connected!”. To understand what’s going on inside the aggregation process we can use the Erlang function :sys.get_state/1, which returns the state of a process.

iex> :sys.get_state(Coinbase.Aggregation)
%{
  {2018, 12, 28, 12, 1, 30} => 1,
  {2018, 12, 28, 12, 1, 31} => 1,
  {2018, 12, 28, 12, 1, 32} => 5,
  {2018, 12, 28, 12, 1, 33} => 2,
  {2018, 12, 28, 12, 1, 38} => 12
}

Great! This is what we wanted to achieve: a map with the number of trades for each second. Let’s see if we can increase the number of trades adding some other products.

# Coinbase.Application - lib/coinbase/application.ex
def start(_type, _args) do
  children = [
    {Coinbase.Aggregation, []},
    {Coinbase.Client, ["BTC-USD", "BTC-EUR", "BTC-GBP",
          "ETH-USD", "ETH-EUR", "ETH-GBP",
          "LTC-USD", "LTC-EUR", "LTC-GBP",
          "BTC-USD"]}
  ]
  ...

In this way, we will receive the trades for all these products. We are going to count all the trades ignoring which product they refer to.

iex> :sys.get_state Coinbase.Aggregation
%{
  {2018, 12, 28, 12, 16, 45} => 2,
  {2018, 12, 28, 12, 16, 47} => 1,
  {2018, 12, 28, 12, 16, 48} => 5,
  {2018, 12, 28, 12, 16, 49} => 7,
  {2018, 12, 28, 12, 16, 50} => 1,
  {2018, 12, 28, 12, 16, 51} => 1,
  {2018, 12, 28, 12, 16, 52} => 1,
  {2018, 12, 28, 12, 16, 53} => 1,
  {2018, 12, 28, 12, 16, 54} => 2,
  {2018, 12, 28, 12, 16, 55} => 13,
  {2018, 12, 28, 12, 16, 56} => 2,
  {2018, 12, 28, 12, 16, 57} => 3
}

Still not so many trades. How can we increase the number of messages?

Subscribing to the Level2 channel

If we want to drastically increase the number of messages we receive from Coinbase, we can also follow the level2 channel. This channel sends us a message for any change in the order book.  The order book is where people (or bots!) place their buy or sell orders.

Order Book

To make this work, we need to refactor a little bit our code.
At first, we add the level2 channel to the subscription message in the Coinbase.Client module.

# Coinbase.Client - lib/coinbase/client.ex
defp subscription_frame(products) do
  subscription_json = %{
    type: "subscribe",
    product_ids: products,
    channels: ["matches", "level2"]
  }
  |> Jason.encode!()

  {:text, subscription_json}
end

Our handle_msg/2 accepts only the trades imposing the condition "type" => "match". We change this condition and just verify that the message has the "time" key, which is needed for the counting part.

# Coinbase.Client - lib/coinbase/client.ex
def handle_msg(%{"time" => _} = msg, state) do
  Coinbase.Aggregation.new_message(msg)
  {:ok, state}
end

We’ve also changed the name of the Coinbase.Aggregation function, from new_trade(trade) to something more generic like new_message(msg).

You can see the refactored Client module code at this link: client.ex

Let’s now refactor the Coinbase.Aggregation module.

# Coinbase.Aggregation - lib/coinbase/aggregation.ex
def new_message(msg),
  do: new_message(Coinbase.Aggregation, msg)

def new_message(pid, msg),
  do: GenServer.cast(pid, {:new_message, msg})

def handle_cast({:new_message, %{"time" => time}}, %{}=counts) do
...

We see that the refactoring is really minimal. We have just renamed the interface functions and changed the message from {:new_trade, trade} to {:new_message, msg}.

Refactored aggregation code at this link: aggregation.ex

Let’s see now if the number of messages we receive from Coinbase is increased.

$ iex -S mix
connected!
iex> :sys.get_state Coinbase.Aggregation
%{
  {2018, 12, 28, 14, 13, 51} => 198,
  {2018, 12, 28, 14, 13, 52} => 218,
  {2018, 12, 28, 14, 13, 53} => 222,
  ...
}

Perfect! The increase is quite noticeable.

If you want to get even more messages try the full channel

GitHub Repo

At this commit you find the full working code of this article’s implementation.

Wrapping Up

In this article, we started to use the real-time data coming from  Coinbase, with a simple aggregation process that counts the trades in the same second. We’ve used a Map to group the trades made in the same second and we’ve seen the result printing the aggregation process state. This gives us an idea of the rate of trades/messages we receive from the server, but is obviously far from ideal.

Using a Map makes the grouping easy, but to have an average rate we should consider the empty seconds (the seconds with no trades) and calculate the average count of the last 10-20 seconds. The keys in a Map are not ordered and if we want to see just the last 10-20 seconds we need to sort these keys, which could be a computationally expensive operation, especially when we have thousands of keys. In further articles, we will see how to count the trades using Lists and Queues.