Elixir Stream and large HTTP responses: processing text

You find the code, of this and the previous article, on poeticoding/httpstream_articles GitHub repo. The code at this repo is not meant for production use, is just part of the examples of these articles.

In the last article we’ve put together the concepts of HTTP async response and processing large files using Elixir Streams, to be able to easily download and save a large file using our HTTPStream module.

The stream we’ve built around HTTPoison is able to emit the chunks of an HTTP chunked response. In this way we avoid the huge memory impact we could have saving the whole response into memory.

In the last example of the previous article, each chunk is streamed down to the pipeline’s functions, saved into our local file and then garbage collected. Then, it was easy to add compression just including the StreamGzip.gzip function in the pipeline.

Text response

Previously we where treating the response just as a binary, delegating the compression to the StreamGzip library and savings the result into a file.

In this part we want to process the lines of a large text file. To make it fun and a bit more realistic, we are going to get some inspiration from the first day of the Advent of code challenge. There are also some cool videos made by José Valim, where he uses Elixir to solve the challenges of the Advent of Code.

I’ve generated and uploaded a 125Mb text file at this link

https://www.poeticoding.com/downloads/httpstream/numbers.txt

with 30 million lines. In each line there is a random integer from -1000 to 1000

767
138
-701
98
...

We want to:

  • 1. process this text file on the fly, while downloading it, line by line.
  • 2. like we did in the first part, we want to avoid big memory spikes (we cannot load the full response into memory)
  • 3. last but not least, if we want to process just the first 30 lines of text, we just want to download the first few chunks needed.

To do just some tests, you can also find a smaller version of 4.5Mb with 1 million lines

https://www.poeticoding.com/downloads/httpstream/numbers_small.txt

which is the same you find in the poeticoding/httpstream_articles repo.

What we are going to build

To process an HTTP response line by line, we need something that works like

File.stream! "numbers.txt" #, [], :line

which creates a stream that opens a file and, by default, emits each line. We want to do something similar with our HTTP async response.

Instead of changing the current HTTPStream module implementation, we want to take full advantage of the Elixir Stream’s composability, writing a function we just add to the pipeline, that converts the chunks to lines.

Chunks to Lines

Ok, but why can’t we just use the implementation we’ve done already for the binary file?

The problem with the previous implementation is that the stream we’ve built emits chunks without distinguishing the lines. We need an Elixir Stream that emits lines instead of chunks.

Unfortunately this is not as easy at it seems: splitting chunks into lines is not enough. This could be true only in a easy and specific case:

["767\n138\n","-701\n98\n","-504\n22"]

where each element of the list is a chunk and each chunk ends with a new line. In this easy case we just need to split each chunk, filter out the empty strings, emitting the result

iex> String.split("767\n138\n")
["767","138",""]

The problem though, is that we do not have any guarantee that all the chunks will be in this form. It’s actually more than likely that multiple parts of the same line will be over different chunks

["767\n13","8\n-701\n98\n-","504\n22"]

You see how in this case the first two chunks don’t end with a newline character \n and the second chunk starts with the final part of the line started in the previous chunk.

Stream.transform

We are now going to implement a function HTTPStream.lines(chunks_enum) which takes a stream of chunks as an input and returns a stream of lines.

chunk to lines
Chunk to Lines

We’ll then use this function as part of our pipeline in this way:

HTTPStream.get("https://.../numbers.txt")
|> HTTPStream.lines()
|> Stream.map(fn line -> ... end)
...

We don’t want to use Enum.reduce since this function is greedy and it would hold all the lines in memory. Instead, we can use Stream.transform which is lazy

Stream.transform(enum, initial_accumulator, fn element, acc ->
...
{list_of_elements_to_emit, new_accumulator}
end)

The first element of the returned tuple is a list of elements we are going to emit, in our case lines.

Accumulating final part of the chunk and using it when processing next chunk
Accumulating final part of the chunk and using it when processing next chunk

We see from the image that the accumulator acc is the last part of the first chunk and is prepended to the second chunk.

We need a function, we’ve called next_lines, that splits a chunk into separate lines and returns a tuple with two elements: {the lines we want to emit, and the last part of the chunk}.

Stream.transform(chunks_enum, "", fn chunk, prev ->
  {lines, last_part} = next_lines(chunk, prev)
  {lines, last_part}
end)

Our initial accumulator is an empty string. This empty string will be passed as prev while processing the first chunk.

Recursive implementation

We now need to write the next_lines(chunk,prev) function. We can implement it using recursion, going through each single UTF-8 character looking for newlines. Remember that to behave like File.stream! we need to preserve the newlines \n.

def next_lines(chunk, prev), do: next_lines(chunk,prev,[])

def next_lines(<<"\n"::utf8, rest::binary>>, prev, lines) do
		next_lines(rest,"",[prev <> "\n" | lines])
	end

def next_lines(<<c::utf8, rest::binary>>, prev, lines) do
  next_lines(rest,<<prev::binary, c::utf8>>,lines)
end

def next_lines(<<>>, prev, lines), do: {Enum.reverse(lines), prev}

#https://github.com/poeticoding/httpstream_articles/blob/36bc2167b7024a990b04b28f9447fb9bc0e0310e/lib/http_stream.ex#L78

Ok, there is a lot happening here. Let’s start from the beginning

  • next_lines(chunk,prev\\"")
    The first clause is just a helper. We pass a chunk, and the accumulator prev. The function calls next_lines/3 , passing an empty list of lines as third argument.
  • next_lines(<<"\n"::utf8, rest::binary>>, prev, lines)
    We are pattern matching a sequence of UTF-8 characters. This function is called only when we reach a newline character. We then call recursively next_lines passing the rest of the chunk we need to process, setting the accumulator to an empty string "", passing list of lines where we’ve prepended the accumulated line, prev.
  • next_lines(<<c::utf8, rest::binary>>,prev,lines)
    Since every time c is a newline the clause above is matched, in this clause c != "\n" so we just need to append it to prev and recursively call next_lines going through the rest of the chunk.
  • next_lines(<<>>,prev,lines)
    <<>> is an empty binary and means we’ve reached the end of the chunk. For performance reason we’ve pepended the lines.
    [prev <> "\n" | lines] is faster than lines ++ [prev], especially when the lines list is big. When we reach the end of our recursion, we need to reverse the lines’ list.

Let’s try this function on iex

# ["767\n13","8\n-701\n98\n-","504\n22"]
iex> {lines, prev} = HTTPStream.next_lines "767\n13"
{["767\n"], "13"}
iex> {lines, prev} = HTTPStream.next_lines "8\n-701\n98\n-", prev
{["138\n", "-701\n", "98\n"], "-"}
iex> {lines, prev} = HTTPStream.next_lines "504\n22", prev
{["-504\n"], "22"}
iex> prev
"22"

Perfect, exactly what we need 👍. We go through the chunks’ list passing the obtained prev to the next call.

HTTPStream.lines

next_lines returns the same tuple we need to return in the reducer function passed to Stream.transform/3. We can then write HTTPStream.lines/1 in a nice and compact way

def lines(chunks_enum) do
  chunks_enum
  |> Stream.transform("",&next_lines/2)
end

Let’s try it on iex

iex> ["767\n13","8\n-701\n98\n-","504\n22"] \
...> |> HTTPStream.lines() \
...> |> Enum.each(&IO.inspect/1)
"767\n"
"138\n"
"-701\n"
"98\n"
"-504\n"
:ok

Mmm 🤔 … there is something wrong here. The last line "22" is missing.

Emitting last line

The reason why it’s not emitted is because it doesn’t end with a newline and it remains stuck as an accumulator (prev). We have to emit it when the stream is ended, but using Stream.transform/3 the reducer function doesn’t know when the stream is going to end! (Please let me know in the comments if you know there is a way to catch the end of a stream)

A workaround we can use, to let next_lines/2 know when the stream reached the end, is to add an :end atom at the end of our chunks’ stream. next_lines/2 than has to handle the case with a specific clause

def next_lines(:end,prev), do: { [prev], ""}

which emits the final line. The accumulator is set to an empty string but it could be anything at this point.

Let’s try it again on iex

iex> ["767\n13","8\n-701\n98\n-","504\n22", :end] \
...> |> HTTPStream.lines() \
...> |> Enum.each(&IO.inspect/1)
"767\n"
"138\n"
"-701\n"
"98\n"
"-504\n"
"22"
:ok

Great, it works! 🎉

But now how can we easily add a :end atom at the end of our HTTP chunked response stream?

Emitting :end at the end of the streamed HTTP response

If you have an alternative way of doing this, please share it with us posting a comment below! 👩‍💻👨‍💻

We need to make a small but significant change to our HTTPStream.get(url) function.

# HTTPStream.get/1
def get(url) do
  Stream.resource(
    start_fun,
    
    # next_fun
    fn
      #first clause
      %HTTPoison.AsyncResponse{id: id}=resp ->
        receive do
          ...
          %HTTPoison.AsyncEnd{id: ^id}->
            # emitting :end
            {[:end], {:end, resp}}
        end

      #second clause
      {:end, resp} -> 
        {:halt, resp}
    end,
		
    after_fun
  )
end
  • 1. When we receive the %HTTPoison.AsyncEnd{} message we know that we’ve reached the end of the HTTP response. Instead of just halting the stream, we emit the :end and set a new accumulator {:end, resp}, where resp is the %HTTPoison.AsyncResponse{} struct.
  • 2. After emitting :end, next_fun is called again. This time the accumulator is the one we’ve just set, {:end, resp}, which pattern matches the second clause of our next_fun.
AsyncEnd, emits :end and :halt
AsyncEnd, emits :end and :halt

Something I don’t like about this change, is that now we always have to handle the final :end, especially when saving the stream into a file.

HTTPStream.get("https://.../large_image.tiff")
|> Stream.reject(&match?(:end,&1))
|> Stream.into(File.stream!("image.tiff.gz"))
|> Stream.run()

The function in second line pattern matches each chunk and filters out the :end atom.

It’s maybe better to enable and disable the final :end via an option passed to HTTPStream.get(url, emit_end), like the version you see on GitHub.

Update: JBraungardt, in the comments below, suggested a cleaner way to emit the last :end atom, using Stream.concat(first_enum, second_enum).

def lines(enum, :next_lines) do
  enum
  |> Stream.concat([:end])
  |> Stream.transform("",&next_lines/2)
end

Sum numbers of a 30M lines remote file

Let’s use what we’ve implemented to process a 125MB remote text file with 30M numbers, each one separated by a newline \n. While processing the lines on the fly, we sum the numbers to get the total.

"https://www.poeticoding.com/downloads/httpstream/numbers.txt"
|> HTTPStream.get()
|> HTTPStream.lines()
|> Stream.map(fn line-> 
	case Integer.parse(line) do
		{num, _} -> num
		:error -> 0
	end
end)
|> Enum.sum()
|> IO.puts()

## OUTPUT
STATUS: : 200
HEADERS: : [
  {"Content-Length", "131732144"},
  {"Accept-Ranges", "bytes"},
  {"Content-Type", "text/plain"},
  ...
]

12468816

Fantastic, we got the result: 12468816! 🎉

With the Erlang Observer I’ve seen sometime some memory spike (still below 100Mb) and sometime the line seems to be almost flat. I think this could be related to how big the chunks are.

memory spike
Memory spike

In the GitHub repo you find a memory_test.exs script you can play with, to see the HTTPStream.line memory allocation with different chunks sizes. Even with a 4.5Mb file, if we exaggerate with the chunk size (like 2_000_000) we have a huge memory spike.

2mb vs 2kb chunks
2mb vs 2kb chunks

It would be great to be able to set a maximum chunks’ size in the HTTPoison options, unfortunately I didn’t find any option to do that.

String.split

Let’s see another way of writing the HTTPStream.lines(chunks) function. In the previous implementation we’ve used recursion to go through each single character of the chunk and to find newlines.

If we don’t need to preserve newlines, we can use String.split/2 along with Stream.transform/3.

def lines(enum) do
  enum
  |> Stream.transform("",fn 
    :end, prev -> 
      {[prev],""}
    chunk, prev ->
      [last_line | lines] = 
        String.split(prev <> chunk,"\n")
        |> Enum.reverse()
      {Enum.reverse(lines),last_line}
  end)
end

The idea is similar to what we did before. We split the chunk into lines and the last element of the list becomes the accumulator, which is concatenated to the next chunk.

See how we extract the last item of the list of lines.

lines = String.split(prev <> chunk, "\n")
[last_line | rev_lines] = Enum.reverse(lines)
{ Enum.reverse(rev_lines), last_line }
  • We split the concatenated string prev <> chunk obtaining a list of lines. We now need to get the last element of the list.
  • We reverse the list, creating a list of new elements. Now, the head of Enum.reverse(lines) is the last element of lines.
  • rev_lines is the list of lines we want to emit, but in the wrong order, so we emit Enum.reverse(rev_lines) and set last_line as the next accumulator.
split and extract last
split and extract last

Let’s see an example on iex

iex> chunks = ["767\n138\n-701\n98\n-5", "04\n22\n375"]
iex> [chunk | remaining_chunks] = chunks
iex> chunk
"767\n138\n-701\n98\n-5"

iex> lines = String.split(chunk,"\n")
["767", "138", "-701", "98", "-5"]
iex> [last_line | rev_lines] = Enum.reverse(lines)
["-5", "98", "-701", "138", "767"]
iex> last_line
"-5"
iex> lines_to_emit = Enum.reverse(rev_lines)
["767", "138", "-701", "98"]

#let's process another chunk
iex> [next_chunk | remaining_chunks] = remaining_chunks
iex> next_chunk
"04\n22\n375"
# we need to prepend last_line
iex> chunk = last_line <> next_chunk
"-504\n22\n375"
iex> lines = String.split(chunk,"\n")
["-504", "22", "375"]

It turns out that this implementation is much faster than the previous one. Let’s do a benchmark

Benchmark HTTPStream.line

If you want to run this benchmark on your computer, you find everything on poeticoding/httpstream_articles.

Let’s consider the stream created by

File.stream!("numbers_small.txt")

which by default emits the lines of a file. We want to compare the speed of this function with HTTPStream.line.

Instead of using a remote file, we are going to use a smaller ~4Mb version, numbers_small.txt you can find on the GitHub repo.

We need to now find a way to simulate the stream of chunks made by HTTPStream.get.

chunk_stream = File.stream!("numbers_small.txt",[],16_000)

Passing chunks size as third argument of File.stream!/3, the stream, instead of lines, will emit chunks (in this case of 16kb).

In the script bench_lines.exs we use Benchee

# bench_lines.exs
chunk_stream = File.stream!("numbers_small.txt",[],16_000)
lines_stream = File.stream!("numbers_small.txt", [], :line)

stream_sum = fn enum ->
  enum
  |> Stream.map(fn line-> 
    case Integer.parse(line) do
      {num, _} -> num
      :error -> 0
    end
  end)
  |> Enum.sum()
end


Benchee.run(%{
  "File.stream! :line" => fn ->
    lines_stream 
    |> stream_sum.()
  end,
  "with next_lines" => fn ->
    chunk_stream
    |> HTTPStream.lines(:next_lines)
    |> stream_sum.()
  end,
  "with String.split" => fn ->
    chunk_stream
    |> HTTPStream.lines(:string_split)
    |> stream_sum.()
  end
},time: 10)

$ mix run bench_lines.exs

Name                         ips        average
with String.split           3.35      298.30 ms
File.stream! :line          2.08      481.22 ms
with next_lines             1.14      875.01 ms

Comparison:
with String.split           3.35
File.stream! :line          2.08 - 1.61x slower +182.93 ms
with next_lines             1.14 - 2.93x slower +576.71 ms

The interesting thing is that the version "with String.split" is even faster than "File.stream! :line", while the first implementation we did is the slowest.

Honestly, I don’t know why the version "with String.split" is the fastest one. Maybe some optimisation in the String.split/2 function? If you are interested about these details, I’ve opened a topic about this on the elixir forum: Streaming lines from an enum of chunks.

Reducing the chunk size from 16_000 to 2_000 we see how both "with String.split" and "with next_lines" are a bit faster.

chunk_stream = File.stream!("numbers_small.txt",[],2000)

Name                         ips        average  
with String.split           3.79      263.67 ms
File.stream! :line          2.06      484.98 ms
with next_lines             1.42      706.48 ms

Comparison:
with String.split           3.79
File.stream! :line          2.06 - 1.84x slower +221.31 ms
with next_lines             1.42 - 2.68x slower +442.81 ms

I think happens because with a smaller chunk all the split, reverse and concatenation operations are faster.

Sum the first 30 numbers

The stream of lines we’ve built is lazy, this means we can take just the first 30 lines and halt the stream, without downloading the whole HTTP response.

To take the first 30 lines we use Enum.take/2

"https://www.poeticoding.com/downloads/httpstream/numbers.txt"
|> HTTPStream.get()
|> HTTPStream.lines()
|> Stream.map(fn line-> 
  case Integer.parse(line) do
    {num, _} -> num
    :error -> 0
  end
end)

|> Enum.take(30)

|> Enum.sum()
|> IO.puts()

You find this code in sum_30_lines.exs.

$ mix run sum_30_lines.exs
STATUS: : 200
HEADERS: : [
  {"Content-Length", "131732144"},
  {"Accept-Ranges", "bytes"},
  {"Content-Type", "text/plain"},
  ...
]
END_FUN

1393

It should be really quick. Once took 30 lines, the stream is halted and the HTTP connection is closed.