The code in this article is heavily inspired from the concepts amazingly explained in the book Designing Data-Intensive Applications by Martin Kleppmann.

Disclaimer, all the code you find in this article, and on the github repo, is written for pure fun and meant just as an experiment.

Published articles in the series:

  1. Part 1 (this article)
  2. Part 2

Introduction

In the last year I've got interested in logs and how something so simple can be the solid foundation of databases like Riak, Cassandra and streaming systems like Kafka.

In this series of articles we will see the different concepts behind a key-values store (Logs, Segments, Compaction, Memtable, SSTable) implementing a simple engine in Elixir, which is a great language to build highly-concurrent and fault-tolerant architectures.

In this first part we will see:

  • What is a log?
  • Making a KV persistent using a log and an index. Using an example we will use along the series, crypto market prices and trades, we are going to see how to store the values in a log using using an index.
  • LogKV in Elixir. A initial super simple implementation in elixir of a Writer, an Index and a Reader.

What is a log?

Let’s think at the most common type of log file, the one we use everyday to debug events and error messages in our applications. This simple file enshrines an interesting property, it's an append-only file. This means that only sequential writes are allowed and everything we write in the log is immutable.

Why sequential writes could be important for us? Well… speed!

Random vs Sequential Access - The Pathologies of Big Data

We see the huge difference between random and sequential access on both classic magnetic disks, SSD and even memory. So, the idea is to leverage the sequential access speed using an append-only file to save the data of our key-value store.

Using a Log to implement a simple KV store

Let’s start with a simple example. Let's consider a realtime application where we have to store the last price in dollars of the Bitcoin (BTC), Ethereum (ETH) and Litecoin (LTC).  

Key Value
BTC 4478.12
ETH 133.62
LTC 33.19

If we just need to keep this snapshot in memory, in Elixir we can use a Map. But persistence is another story. There are many different ways and technologies we could use to store this map and make it persistent.

If this snapshot would be updated just few times in a day, with just few currencies, then serialising the map into a file would be fine and easy to do, but this is obviously not our case! Our imaginary crypto application needs to keep track of any market's movement with hundreds of updates per second for hundreds of currencies.

But how can we use an append-only file, where data written is immutable by nature, to store the mutable data of a key-value store, to leverage sequential access and to keep our map persistent?

The idea is pretty simple:

  • append to our log each single price update (value) for any currency (key)
  • use our Map as an index, keeping track of the position and size of the values within our logfile.
Concept of key-value persistence using a log
  1. 17:14:59 - LTC trades at 32.85$. We append the string "32.85" to the log and we update the "LTC" key of our index (implemented with a Map) with value's offset (0 since it's the first value in the file) and it's size (5 bytes, since it's a string).

  2. 17:15:00 - ETH trades at 130.98$. We append the string "130.98" to the log and we update the "ETH" key of our index with offset 5 and size 6 bytes.

  3. 17:15:01 - BTC trades at 4411.99$. We append the string "4411.99" to the log and we update the "BTC" key of our index with offset 11 and size 7 bytes.

What happens if we receive a new price for ETH? How can we overwrite the value in the log since the value we wrote is immutable and we can just append?

  1. 17:15:09 - ETH trades at 131.00$.

Since to leverage sequential writes we can just append, we then just append the new value updating the index with the new offset and size.

The read is efficient too. To retrieve the values from the log, we just use offset and size in the index and with need one seek of the disk to load our value into memory.

LogKV in Elixir

one Writer and Index - multiple Readers 

We are going to write three different Elixir modules.

The Index holds the offset and size for all the keys. It can receive an update request from the Writer and a lookup request from the Reader to get offset and size.

The Writer can only append values to the log and send update requests to the Index. When we want to set a new value for a specific key, we will send a message to the writer.

The Reader can read values from the log, asking offset and size informations to the Index. When we want to lookup for a key we will send a message to the reader.

Some considerations about concurrency

We have only one writer appending to the log file, since having multiple parallel writers would just lead to data corruption.
We can have multiple readers for each log file, since this doesn't lead to any concurrency problem. The reading operation does not need any lock and doesn't block the writer, since the log it's append only and the values written is immutable.

LogKV.Index

The index is implemented as a GenServer. Its state is a Map where the keys of the map are the keys we want to store (like "BTC", "ETH", "LTC" etc..) and the values are tuples { offset, size }.

Initialisation of the index

defmodule LogKV.Index do
  use GenServer

  def start_link([]) do
    GenServer.start_link(__MODULE__, :empty, name: __MODULE__)
  end

  def init(:empty), do: {:ok, %{}}
  
end

At first we just need to start the index with an empty state, which is an empty Map. We need only one index, so specifying the :name we are actually forcing to be able to start just one index named LogKV.Index.

Update

def update(key, offset, size) do
  GenServer.call(__MODULE__, {:update, key, offset, size})
end

def handle_call({:update, key, offset, size}, _from, index_map) do
  {:reply, :ok, Map.put(index_map, key, {offset, size})}
end

The update/3 function sends a synchronous request (a call) to the LogKV.Index process to update the {offset, size} tuple for that specific key, where the offset and size are the two values the LogKV.Reader needs to get the value from the log.

We could make it asynchronous with a cast, so the Writer doesn't have to wait a reply, but in this way we have the problem that we don't know when the Index will have the key updated and that we don't know when the Reader will be able to have access to the updated data. In few words, data consistency issue: we write, asynchronous with a cast, and the reader still reads old data.

Lookup

def lookup(key) do
  GenServer.call(__MODULE__, {:lookup, key})
end

def handle_call({:lookup, key}, _from, index_map) do
  {:reply, get_key_offset_size(key, index_map), index_map}
end

defp get_key_offset_size(key, index_map) do
  case Map.get(index_map, key) do
    {_offset, _size} = offset_size -> {:ok, offset_size}
    nil -> {:error, :not_found}
  end
end

The lookup/1 function returns

  • {:ok, {offset, size} } if the key exists
  • {:error, :not_found } if the key doesn't exist

I prefer to keep the handle_call light, moving the logic to a seperate private function, get_key_offset_size/2.

You can find a complete version of LogKV.Index module, with doctests, on the LogKV github repo: index.ex


Let's test it on iex

iex> LogKV.Index.start_link([])
{:ok, #PID<0.176.0>}
iex> LogKV.Index.start_link([])
{:error, {:already_started, #PID<0.176.0>}}

We need first to start the Index. We see that we can only have one index running, since the name is fixed.

iex> LogKV.Index.update("btc",0,10)
:ok

then we update the offset and size of a key

iex> LogKV.Index.lookup("btc")
{:ok, {0, 10}}

retrieve offset and size data from the index

iex> LogKV.Index.lookup("ltc")
{:error, :not_found}

and get a :not_found error when the index doesn't have the key.

LogKV.Writer

The Writer has the responsibility of:

  • creating/opening the log file during the initialisation process
  • append values and update the index accordingly

Initialisation

defmodule LogKV.Writer do
  use GenServer
  
  def start_link(log_path) do
    GenServer.start_link(__MODULE__, log_path, name: __MODULE__)
  end
  
  def init(log_path) do
    fd = File.open!(log_path, [:write, :binary])
    {:ok, %{fd: fd, current_offset: 0}}
  end

end

Since for this implementation we need just one writer, we force for semplicity the name to LogKV.Writer during the start of the GenServer, like we did for LogKV.Index.

The init/1 opens the file, saving the current_offset and file's pid into the writer's state. The current offset is needed to know where we are inside the log file, so we can update the index with the correct absolute offset.

To make everything simple for this first implementation, there is no particular error handling here. If there is an issue with the file opening, an exception is raised.

Put

def put(key, value) do
  GenServer.call(__MODULE__, {:put, key, value})
end

def handle_call(
  {:put, key, value}, 
  _from, 
  %{fd: fd, current_offset: current_offset} = state) do
    
  :ok = IO.binwrite(fd, value)
  size = byte_size(value)

  LogKV.Index.update(key, current_offset, size)

  new_state = %{state | current_offset: current_offset + size}
  {:reply, {:ok, {current_offset, size}}, new_state}
end

The LogKV.Writer.put/2 function, sends a message to the LogKV.Writer process which appends the value to the log file IO.binwrite(fd, value). Then it updates the index using the current_offset to know where the value is within the log file, and the value's size to know how many bytes we need to read from the file.
We then update the writer's state, setting the new current_offset.

In this way we make the values persistent. The index is not persistent though, since it's state is only kept in memory. We will see in the second part (next week) how can we simply store the index metadata along the values in the logfile, making the index recoverable.

Full code at this link: writer.ex


Let's try the writer on iex

iex> LogKV.Index.start_link([])
iex> LogKV.Writer.start_link("test.db")
{:ok, #PID<0.197.0>}

After the Index we start the LogKV.Writer process, passing the path of our log-file , test.db in this case. You'll see that the test.db file is created once you start the Writer process, where you've started the iex session.

iex> LogKV.Writer.put("ltc","32.85")
{:ok, {0, 5}}

We save the string "32.85" for the key "ltc". Let's add few more keys

iex> LogKV.Writer.put("eth","130.98")
{:ok, {5, 6}}
iex> LogKV.Writer.put("btc","4411.99")
{:ok, {11, 7}}
iex> LogKV.Index.lookup("btc")
{:ok, {11, 7}}

You can see how the offset increases and the size is the number of bytes of the value and we see how the Index is updated for us from the Writer.

If we check what's inside the file test.db, this is what we see

$ cat test.db
32.85130.984411.99

which is the result of the appends of the values. We are able to distinguish them thanks to the data in the Index.

If we update a key, the new value is appended to the file and the old value remains at the beginning of the log file.

iex> LogKV.Writer.put("eth","131.00")
{:ok, {18, 6}}
$ cat test.db
32.85130.984411.99131.00

the new offset is 18, this means the Index now points to the new value.

LogKV.Reader

The Reader's aim is to pull the values from the log-file, using the Index. Since the Reader opens the file just in read mode, and that the data appended in the log-file is immutable, we can start many of them concurrently, providing values in parallel.

Initialisation

defmodule LogKV.Reader do
  use GenServer

  def start_link(log_path) do
    GenServer.start_link(__MODULE__, log_path)
  end

  def init(log_path) do
    fd = File.open!(log_path, [:read, :binary])
    {:ok, %{fd: fd}}
  end

end

The reader doesn't have a fixed name because we can have multiple readers for one single log-file. The file is opened during initialisation and the file pid is kept in the state.

Get - Reading values from the log

def get(pid, key) do
  GenServer.call(pid, {:get, key})
end

def handle_call({:get, key}, _from, %{fd: fd} = state) do
  case LogKV.Index.lookup(key) do
    {:ok, {offset, size}} ->
      {:reply, :file.pread(fd, offset, size), state}

    {:error, _} = error ->
      {:reply, error, state}
  end
end

get/2 sends a call message to the Reader's process pid.  
If the key doesn't exist in the Index we just proxy the error, which will be {:error, :not_found}.
If the key is found, we use the erlang function :file.pread/3 which seeks the file descriptor at the offset location and reads size number of bytes.

Full code at this link: reader.ex


Let's try the Reader on iex. We need first start the Index and the Writer. Let' assume to have the Index and the Writer of the previous example.

iex> {:ok, pid} = LogKV.Reader.start_link "test.db"
iex> LogKV.Reader.get(pid, "btc")
{:ok, "4411.99"}

It seems to work great. Let's try to get the value of eth key, which has an old and new value in the log.

iex> LogKV.Reader.get(pid, "eth")
{:ok, "131.00"}

Good, we see how it gets the correct value, the latest one we've put.

GitHub Repo

At this commit you find the full working code of this article's implementation.

Wrap up

We learned how to leverage the sequential writes implementing an Index, a Writer and a Reader. This is a super-simple implementation, but it helped us to introduce conceptually the topic. There are obviously different issues like

  • The index is kept in memory. Memory then limits the number of keys we can have in our storage engine.

  • If our storage engine crashes, we loose the index (which is only in memory) without being able to recover the data. This can be fixed appending the keys along with the value. In this way we are able to recover the index scanning the log file.

  • The log grows indefinitely keeping the old values. We need to first put a cap to the log size and to get rid of the old values. This leads to important concepts like segments and compaction.

In the next parts we will dig into these issues expanding the implementation of our storage engine.