Processing Large CSV files with Elixir Streams

We need to process a large CSV file of minute by minute volume and prices. Our task is pretty simple: we just want to get the first line of the year 2015, with valid data. At first, this seems an easy task we could tackle with String.split, Enum.map/filter/find. But what happens when the CSV file is large?

Let’s see how with Elixir Streams we can elegantly manage large files and create composable processing pipelines.

Getting a large CSV from Kaggle

We need at first a real and large CSV file to process and Kaggle is a great place where we can find this kind of data to play with. To download the CSV file just go to the Kaggle Bitcoin Historical Data page, and download the bitstampUSD CSV.

Kaggle – Historical BTC Page

The CSV is the historical Bitstamp BTC-USD prices and volumes aggregated with 1-minute interval. Once unzipped, the size is around 220Mbyte and it has 8 columns and 3.6 million rows.

Historical BTC-USD CSV

We will focus on the first two columns, Timestamp and Open, filtering the header and all the rows where the open price is NaN. Converting Timestamp to a DateTime struct, we then find the first row of 2015 and return it.

The greedy approach

File.read!("large.csv")
|> String.split("\n")
|> Enum.map(&String.split(&1, ","))
|> Enum.filter(fn
  ["Timestamp" | _] -> false
  [_, "NaN" | _] -> false
  _ -> true
end)
|> Enum.find(fn
  [timestamp | _] ->
    dt =
      timestamp
      |> String.to_integer()
      |> DateTime.from_unix!()

    dt.year == 2015
end)
|> IO.inspect() # RESULT ["1420070400", "321", ...]

The first column is the timestamp, where 1420070400 means 1st Jan 2015 00:00.

It works.. but this approach hides big memory and processing issues. With the Erlang Observer we can easily see the total memory allocated by our process.

iex> :observer.start
Greedy approach – 5.5GB peak

Just processing a 220Mbyte CSV file, which is not even that big, we had a crazy 5.5GB peak of allocated memory.

Greedy Steps – Memory Allocation

Let’s inspect the code to understand what it does and how the data moves between each step.

line 1: File.read!("large.csv") loads the whole CSV file into memory. Just in this first step, we allocate 220Mbyte. And that’s just the beginning.

line 2: String.split(text,"\n") takes the loaded text and splits it into lines, creating a list of new strings representing the rows of the CSV.

line 3: Enum.map(rows, &String.split(&1, ",")) splits each single CSV row into a list of columns. The first column is the Timestamp, the second the Open price etc..

[
  Timestamp,Open,High,Low,Close,...
  ...
  ["1420070400", "321", ...]
  ...
  ["1541888400","6359.96",..],
  ["1541888460","NaN",..],
  ...
]

line 4: We use Enum.filter/2 and pattern matching to filter out the header and rows with NaN values in the Open column.

[
  ...
  ["1420070400", "321", ...]
  ...
  ["1541888400","6359.96",..],
  ["1541888520","6363.73",..],
  ...
]

line 9: Enum.find/2 loops over the whole list of mapped and filtered rows, returning the result once the condition is matched.

["1420070400", "321", ...]

We see how each one of these functions goes through the whole data set creating a new big data set. String.split splits the whole text into lines, Enum.map maps all the lines into columns etc… this is a huge waste of memory and processing resources.

Lazy Processing with Elixir Streams

We don’t need to load all the data in memory! We can actually try to load and process one line of text at a time. This is where Elixir Streams come into play!

Streams are composable, lazy enumerables

https://hexdocs.pm/elixir/Stream.html

Lazy means that when we use a Stream to process a CSV file, it doesn’t open and load the whole file. Instead, it reads one line at a time. We can compose complex pipelines with different processing steps where the stream reads and pipes one single line at a time, without having to process all the lines at every single step.

Elixir Streams – Looping – One line at a time

In the image above, we see how one single line is read and processed by the whole pipeline. Once the final step finishes to process it, a new line is then read and piped by the stream.

Instead of opening a file withFile.open, we use the functionFile.stream! to create a Stream.

iex> File.stream!("large.csv")
%File.Stream{
  line_or_bytes: :line,
  modes: [:raw, :read_ahead, :binary],
  path: "large.csv",
  raw: true
}

File.stream!("large.csv")returns a Stream without opening the file. We can use this stream to compose a pipeline using the Stream module functions.

iex> File.stream!("large.csv") |> Stream.map(&String.split(&1,","))
#Stream<[
  enum: %File.Stream{
    line_or_bytes: :line,
    modes: [:raw, :read_ahead, :binary],
    path: "large.csv",
    raw: true
  },
  funs: [#Function<48.51129937/1 in Stream.map/2>]
]>

Instead of usingEnum.map, we use Stream.map which returns a stream. This stream has a funs property which is a list of functions that will be applied to each row. At the moment there is no processing, we are only composing our pipeline.

To run our stream, we need to use a function that actually enumerate the stream, like Enum.count/take/find/map/filter etc..

iex> File.stream!("large.csv") |> Enum.count()
3603137

As soon as Enum.count tries to loop through the stream, the stream opens the file and starts reading and passing the lines to Enum.count. If you look at the Erlang Observer, you’ll see that there is almost no memory peak, since the Enum.count function counts the lines one at the time

Compose our pipeline with streams

It’s now time to build our processing pipeline using Streams, instead of just Enum functions. You’ll see that the code will seem similar, but the way data flows between functions is quite different.

Let’s write the first part of the pipeline.

File.stream!("large.csv")
|> Stream.map(&String.trim(&1))
|> Stream.map(&String.split(&1, ","))
|> Stream.filter(fn
  ["Timestamp" | _] -> false
  [_, "NaN" | _] -> false
  [timestamp | _] ->
    IO.puts("filter -> #{timestamp}")
    true
end)
#Stream<[
  ...
  funs: [#Function<48.51129937/1 in Stream.map/2>,
   #Function<48.51129937/1 in Stream.map/2>,
   #Function<40.51129937/1 in Stream.filter/2>]
]>

This first part of the pipeline returns a stream. The functions we pass to map and filter are the same as the initial example, and they are saved inside the stream processing pipeline. The first two map are where we trim each line and split it into columns. The third is where we filter out the header and NaN open prices.

|> Enum.find(fn
  [timestamp | _] ->
    ts = String.to_integer(timestamp)
    dt = DateTime.from_unix!(ts)
    IO.puts("find -> #{timestamp} - #{dt.year}")
    dt.year == 2015
end)

This is the final step, the one that actually runs the pipeline. The function we pass to Enum.find transforms the timestamp to a DateTime struct and returns true when it finds that the year is 2015.
Enum.find receives from the stream only the rows that are already filtered by the step before and once the condition dt.year == 2015 is met, it stops and returns the item.

filter -> 1370321820
find -> 1370321820 - 2013
filter -> 1370321880
find -> 1370321880 - 2013
...
filter -> 1420070340
find -> 1420070340 - 2014
filter -> 1420070400
find -> 1420070400 - 2015
["1420070400", "321", "321", "321",...]

Looking at the logs we’ve put in the filter and find functions, we can see that each line is processed by the whole pipeline before starting to process the next one. Once the Enum.find function finds the first element with year 2015, it stops returning it, without having to process the rest of the stream.

Benchmarking with Benchee

Let’s compare the greedy and lazy approaches. We benchmark their speed with benchee while we monitor the memory footprint with the Erlang Observer.

Greedy VS Lazy

We see that the lazy approach has not only tremendous implications into memory consumption, but it also makes the code much faster compared to the greedy version.