Published articles in the series:

  1. Part 1
  2. Part 2 (this article)

In the part 1, we've introduced a simple implementation of a kv-store engine, written in Elixir. We developed three modules: an Index, a Writer and a Reader. The Writer writers the values in a log-file keeping the keys in a in-memory Index. Since the state of the index is only kept in memory, once the index process fails, or the whole engine crashes, we are not able to distinguish the values ...

In this part we'll do a step further, making both keys and values persistent, being able to recover the Index in the case of a failure.  

Index Recovery


Since the keys are only kept in memory by the Index process, if the Index (or the whole engine) crashes we loose the keys! At least we have stored the values in log-file, right? Not really, since without the Index we don't know where a value starts and ends.

Because the keys and {offset, size}data are so important, we need to store them somewhere. The best way is to store them along with the values, in the log-file, keeping the sequential access to the disk. In this way we append just few more bytes for each entry, making us able to recover the Index from the log-file.  

Taking a bit of inspiration from the Bitcask design, let's see how we can model our entry.

Single entry to save both Key and Value in the log-file

  1. In the first 8 bytes of the sequence we store a timestamp in a 64-bit unsigned int. The integer is large enough to store the time in milliseconds. The timestamp doesn't help us to rebuild the index, but it could be useful to understand when each single entry was appended.
  2. The key length is variable, and it's size needs then to be saved. With just 2 bytes, using a 16bit unsigned int, we can represent a maximum key size of ~ 65kb, which for our case is more than enough.
  3. In the part-1 we didn't have any limitation to the value's size. In this case we need to put use a fixed number of bytes to represent this size. With a 32bit unsigned int, the maximum allowed value's size would be around 4.30GB. If you need to save values of this kind of sizes, maybe is better to use the filesystem directly!
  4. Then we append the key and the value.


Let's see now how to implement this new design, starting from the Writer.

LogKV.Writer

Consider the part-1 Writer for a moment. It appends only the value and it updates the index

def handle_call(
  {:put, key, value}, _from, 
  %{fd: fd, current_offset:: current_offset} = state) 
do  
  :ok = IO.binwrite(fd, value)
  size = byte_size(value)
  LogKV.Index.update(key, current_offset, size)
  ...
end

Now we need to create a function that, given a key and a value, builds the binary data representation seen previously, with timestamp, key size, value size, key and value.

Timestamp

Let's start with the timestamp.

iex> timestamp = :os.system_time(:millisecond)
1543706328075

to convert the integer to a binary (uint64 big endian representation) we use this statement

iex> timestamp_data = <<timestamp::big-unsigned-integer-size(64)>>
<<0, 0, 1, 103, 108, 17, 60, 11>>

where timestamp_data now contains the first 8 bytes of our entry.

Key and Value Size

Using the same conversion statement used for the timestamp, we now convert the key size to a binary (uint16 big-endian).

iex> key = "btc"
iex> key_size = byte_size(key)
3
iex> key_size_data = <<key_size::big-unsigned-integer-size(16)>>
<<0, 3>>

And for the value

iex> value = "4411.99"
"4411.99"
iex> value_size = byte_size(value)
7
iex> value_size_data = <<value_size::big-unsigned-integer-size(32)>>
<<0, 0, 0, 7>>

We have everything now to write the first part of our kv_to_binary/2 function, where we calculate sizes and convert the integers into binaries.

defp kv_to_binary(key, value) do
  timestamp = :os.system_time(:millisecond)
  timestamp_data = <<timestamp::big-unsigned-integer-size(64)>>

  key_size = byte_size(key)
  value_size = byte_size(value)

  key_size_data = <<key_size::big-unsigned-integer-size(16)>>
  value_size_data = <<value_size::big-unsigned-integer-size(32)>>
    
  sizes_data = <<key_size_data::binary, value_size_data::binary>>
    
  ...

in the last line we see how we've built sizes_data, which is the concatenation of the key_size_data and value_size_data binaries.

We then concatenate key and value binaries all together with sizes_data making a single data variable.

kv_data = <<key::binary, value::binary>>
data = <<timestamp_data::binary, sizes_data::binary, kv_data::binary>>

The reason why I've divided the building into different steps is because we need the relative offset of the value within the binary.

logkv_part2_relative_offset

value_rel_offset = byte_size(timestamp_data) + 
                   byte_size(sizes_data) + 
                   key_size

To make the Index to point directly to the value, we will use the relative_offset to the current_offset of the Writer.

kv_to_binary(key, value)

Let's see now the full implementation

defp kv_to_binary(key, value) do
  timestamp = :os.system_time(:millisecond)
  timestamp_data = <<timestamp::big-unsigned-integer-size(64)>>

  key_size = byte_size(key)
  value_size = byte_size(value)

  key_size_data = <<key_size::big-unsigned-integer-size(16)>>
  value_size_data = <<value_size::big-unsigned-integer-size(32)>>

  sizes_data = <<key_size_data::binary, value_size_data::binary>>

  kv_data = <<key::binary, value::binary>>

  data = <<timestamp_data::binary, sizes_data::binary, kv_data::binary>>

  value_rel_offset = byte_size(timestamp_data) + 
                     byte_size(sizes_data) + 
                     key_size
  
  {data, key_size, value_rel_offset, value_size}
end

The function returns the full binary data we are going to write in the log-file, the value_size and the value_rel_offset.

handle_call {:put, key, value}

We can now use the new data coming from kv_to_binary/2.

def handle_call(
   {:put, key, value}, _from, 
   %{fd: fd, current_offset: current_offset} = state) 
do

  {data, _key_size, value_rel_offset, value_size} = kv_to_binary(key, value)
  
  :ok = IO.binwrite(fd, data)

The LogKV.Write.put/2 function sends a {:put, key, value} message to the Writer process which handles it using this handle_call/3. The key/value pair is than converted to a data binary, which can be directly appended to the log-file, with IO.binwrite(fd, data).

current_offset is the absolute offset, pointing now to the beginning of the whole entry, which starts with the timestamp. Since we want to make the index pointing directly to the value, we can use the current_offset and the value rel_offset to calculate the value absolute offset, like so

value_offset = current_offset + value_rel_offset
LogKV.Index.update(key, value_offset, value_size)

The current_offset has then to be updated to point to the end of the file, along with the index state

new_state = %{state | current_offset: value_offset + value_size}
{:reply, {:ok, {value_offset, value_size}}, new_state}

We are ready now to see our writer in action on iex
iex> LogKV.Index.start_link []
{:ok, #PID<0.134.0>}
iex> LogKV.Writer.start_link "test.db"
{:ok, #PID<0.136.0>}
iex> LogKV.Writer.put("ltc","32.85")
{:ok, {17, 5}}
iex> LogKV.Writer.put("eth","130.98")
{:ok, {39, 6}}
iex> LogKV.Writer.put("btc","4411.99")
{:ok, {62, 7}}

the put/2 returned value is {:ok, {value_offset, value_size}} and we see that the value_offset is much higher than before.

This is what we should expect. Three entries with their timestamp, sizes, key and value.

Writer Output

Inspecting test.db file with the hexdump command

$ hexdump -C test.db
00 00 01 67 75 e8 3f ca  00 03 00 00 00 05 6c 74  |...gu.?.......lt|
63 33 32 2e 38 35 00 00  01 67 75 e8 3f cd 00 03  |c32.85...gu.?...|
00 00 00 06 65 74 68 31  33 30 2e 39 38 00 00 01  |....eth130.98...|
67 75 e8 3f cd 00 03 00  00 00 07 62 74 63 34 34  |gu.?.......btc44|
31 31 2e 39 39                                    |11.99|

we can easily see, in the output on the right, the keys and values, while in the number hex output, after the first 8 bytes (the timestamp) you find

  • 00 03 two bytes representing the size of the first key "ltc"
  • 00 00 00 05 4 bytes, the size of the first value "32.85"

Full code at this link: writer.ex

LogKV.Index

We still could use the same Index implemented in the part-1, but since the Writer now writes a much more detailed log-file, we can add a recovery process to load the keys and the values' offsets from the log-file.
Starting from the beginning we evaluate the first entry and after getting the key and the value's offset, we jump to the next entry without the need of reading the value.

index building process

Let's print here the full code and then see line by line what it does

defp load_offsets(fd, offsets \\ %{}, current_offset \\ 0) do
  :file.position(fd, current_offset)

  with 
    <<_timestamp::big-unsigned-integer-size(64)>> <- IO.binread(fd, 8),
    <<key_size::big-unsigned-integer-size(16)>> <- IO.binread(fd, 2),
    <<value_size::big-unsigned-integer-size(32)>> <- IO.binread(fd, 4),
    key <- IO.binread(fd, key_size) 
  do
    
    value_abs_offset = current_offset + 14 + key_size

    offsets = Map.put(offsets, key, {value_abs_offset, value_size})

    load_offsets(fd, offsets, value_abs_offset + value_size)
  else
    :eof -> {current_offset, offsets}
  end
end

The private function load_offsets/3 considers that the log-file is already open with [:read, :binary] options, and the file's pid is passed as fd.

At first the offsets map is empty and the current_offset is at the beginning of the file.

:file.position/2 seeks the file descriptor at the current_offset position. We than use the powerful with construct, which helps us to executes a list of statements, pattern matching the results.

<<timestamp::big-unsigned-integer-size(64)>> <- IO.binread(fd, 8)

in this way we first read 8 bytes from the file, IO.binread(fd,8), and with <<timestamp::big-unsigned-integer-size(64)>> we pattern match the 8 bytes binary doing exactly the opposite of the conversion we did before in the writer.We read the binary as an uint64 big-endian and set it into the timestamp variable.

We then do the same for key_size, value_size and key, which is all we need to recover they key, calculate the value's absolute offset and update the offsets map.

We then recursively call the function until :eof is reached. Each time the function is called, it positions the file descriptor to the beginning of the new entry. Once the end-of-file it returns the map of the loaded offsets along with the current_offset.

init(log_path)

We are almost finished. We just need to integrate the load_offsets/3 function to load the offsets during Index initialization.
We first implement a new LogKV.Index.start_link function, that accepts the log-file path to recover from, which will be passed to a new init.

def start_link(log_path) when is_binary(log_path) do
  GenServer.start_link(__MODULE__, log_path, name: __MODULE__)
end

def init(log_path) do
  with {:ok, fd} <- File.open(log_path, [:read, :binary]),
       {_current_offset, offsets} = load_offsets(fd) 
  do
    File.close(fd)
    {:ok, offsets}
  else
    _ -> init(:empty)
  end
end

the new init/1 opens the log-file and loads the offsets that will become the new index state. This process could take a bit, depending on the amount of entries present in the log.

Full code at this link: index.ex

GitHub Repo - Part 2 release

At this commit you find the full working code of this article's implementation.

Wrap Up

We learned how change our Writer and Index implementation to make the Index recoverable from the log-file.
There is still a lot to discover. In the next part we are going to make the engine a bit more reliable, with the help of supervisors.

We see quickly, in the video below, how to use this implementation on the interactive shell.