The other day while at work someone commented on Slack about the 1BRC, where you are supposed to read 1 billion rows from a single file and run some simple operations 1BRC
After reading a bit, I became really interested because I was seeing some langs like Java doing 6 seconds, C doing about 1.2 seconds and so on. And Elixir being a functional language and knowing that it likes to copy everything for inmutability, it got me wondering what’s the limit?
NOTE: I run a Ryzen 7 3700X with 8 cores 16 Threads, 32GB of RAM, and because why not in Windows 10
Other stuff that I’ve tried
I ended up having 4 different solutions here is the recap
- File.read + Task.async_stream + GenServer per unique station (about 86 seconds)
- File.read + Task.async_stream + GenServer per unique station but state stored in ETS (about 86 seconds)
- File.read + Task.async_stream + GenServer per unique station but 8 shards per stations (86 seconds again)
- File.read + Task.async_stream + GenServer with sharded ETS tables (86 seconds)
At the end of the day, the problem was simply the message passing, it copies to much, I tried to use ETS and such but because I have to actually send the line to the GenServer using any kind of mutable storage was of no avail.
The GenServer was needed because I was using it to synchronize the read and writes to It’s state.
Getting complex
The basic idea can be seen like this:
- Create a counter for each new station storing the min, max, sum and count. (sum and count for the median)
- Having different counters for each station will be easier to manage and will be handy for just blocking an specific counter. We use counters because is one of the fastest ways of creating mutations and with the add/sub API they make a great tool for concurrency.
I will be creating two different counters per station.
counter = :counters.new(2, [:atomics])
counter2 = :counters.new(2, [:write_concurrency])
- The name of each station alongside the counter reference will be stored in a :persistent_term. Why that? Well, once a new station is added with the counters it will be never changed. Giving us one of the fastest ways of fetching data.
:persistent_term.put(:erlang.phash2(name), {name, counter, counter2, mutex})
- Then a mutex for each station will be created, that allows only once process to access an specific counter. With this mutex we will be 100% sure that the min and max values are correct.
- Finally, 16 blocks of 300k lines will be run in parallel using
Task.async_stream/3
.
Mutex
The implementation of the mutex is really straightforward
defp acquire(mutex) do
if :counters.get(mutex, 1) == 0 do
:counters.add(mutex, 1, 1)
else
acquire(mutex)
end
end
defp release(mutex) do
:counters.sub(mutex, 1, 1)
end
Yes, I know, is a bit vague but I mean it does work. When a process does not get the acquire it will execute it as many times as the BEAM allows it. I did not decide to add any sleep nor pid synchronization because running tests no difference was made. (You can prove me wrong)
Here is the whole thing
defmodule Brc do
@moduledoc false
# 1 000 000 000
def run do
"measurements.txt"
|> File.read!()
|> String.split("\n", trim: true)
|> Stream.chunk_every(300_000)
|> Task.async_stream(fn block ->
Enum.each(block, &process_line/1)
end, ordered: false, timeout: :infinity)
|> Stream.run()
read_terms_and_build_output()
end
defp process_line(line) do
[name, measurement] = String.split(line, ";")
value = trunc(String.to_float(measurement) * 100)
case get_counters(name) do
{:ok, counter, counter2, _} ->
# init the counters
:counters.add(counter, 1, value)
:counters.add(counter, 2, value)
:counters.add(counter2, 1, value)
:counters.add(counter2, 2, 1)
{counter, counter2, mutex} ->
:counters.add(counter2, 1, value)
:counters.add(counter2, 2, 1)
acquire(mutex)
min = :counters.get(counter, 1)
max = :counters.get(counter, 2)
if value < min do
:counters.sub(counter, 1, min - value)
end
if value > max do
:counters.add(counter, 2, value - max)
end
release(mutex)
end
end
defp get_counters(name) do
key = :erlang.phash2(name)
{_, a, b, mutex} = :persistent_term.get(key)
{a,b, mutex}
rescue
_ ->
mutex = :counters.new(1, [:atomics])
counter = :counters.new(2, [:atomics])
counter2 = :counters.new(2, [:write_concurrency])
{:persistent_term.put(:erlang.phash2(name), {name, counter, counter2, mutex}), counter, counter2, mutex}
end
defp read_terms_and_build_output do
:persistent_term.get()
|> Enum.filter(fn {id, _} -> is_integer(id) end)
|> Enum.map(fn {_id, {name, counter, counter2, _}} ->
Task.async(fn ->
min = :counters.get(counter, 1)
max = :counters.get(counter, 2)
sum = :counters.get(counter2, 1)
count = :counters.get(counter2, 2)
{name, Float.round(min / 100, 2), Float.round(max / 100, 2), Float.round(sum / 100), count}
end)
end)
|> Task.await_many(:infinity)
|> List.flatten()
|> Enum.sort_by(&elem(&1, 0))
|> Enum.map(fn {name, min, max, sum, count} -> "#{name};#{min};#{Float.round(sum / count, 2)};#{max}\n" end)
|> then(&File.write("output.txt", &1))
end
defp acquire(mutex) do
if :counters.get(mutex, 1) == 0 do
:counters.add(mutex, 1, 1)
else
acquire(mutex)
end
end
defp release(mutex) do
:counters.sub(mutex, 1, 1)
end
end
It takes about 18-20 secs to do everything, 4 secs just to read the File. It will scale with more cores really easy.
At the end I feel that this is really cheating, I mean you could replace this code with C, and it the solution will be exactly the same. But the goal is to do it as fast as possible.
For my part I don’t think that I will not continue this further I am happy I never used counters until now. I don’t think this could be further improved without using macros and building them to match the dataset and stuff like that. You could try to read the file faster, or spawn the tasks in a different way.
From my part that’s all.