Elias-Fano Encoding
Elias-Fano is an encoding algorithm designed to compress non-decreasing sequences of integers while maintaining constant-time random access. It was independently proposed by Peter Elias and Robert Mario Fano in the 1970s.
At first glance, this does not look like a very useful thing, but it turns out to be extremely practical. One common use case is search engines, where we need to compress indexes that point to variable-length data.
Take a book like "Alice in Wonderland". And let's say we want to find all the positions of a given word in the text as fast as possible. One straightforward approach is to precompute every position of every word in the text and store them in a map. Something like this:
Now let's estimate how much space it would take to store indexes for the top 500 most frequent words in this book. We can write a simple python script that builds the index and calculates the total size assuming we store positions in bitarray, in our case each element would take 15 bits.
import re, requests, string, math, collections alice_text = requests.get("https://www.gutenberg.org/files/11/11-0.txt").text tokens = re.findall(r'\b\w+\b', alice_text.lower()) freq = collections.Counter(tokens).most_common(500) size = sum(count for _, count in freq) max_index = len(tokens) - 1 print(f"Index size: {(math.ceil(math.log2(max_index)) * size) / 8 / 1024:.2f}KB")
Index size: 42.08KB
Now let’s do the same calculation for Elias-Fano encoded indexes. For this we will use a simple formula, which I will explain later.
import re, requests, string, math, collections alice_text = requests.get("https://www.gutenberg.org/files/11/11-0.txt").text tokens = re.findall(r'\b\w+\b', alice_text.lower()) freq = collections.Counter(tokens) top_words = set(w for w, _ in freq.most_common(500)) elias_fano = collections.defaultdict(lambda: (0, 0)) index, max_index = collections.defaultdict(list), 0 for i, token in enumerate(tokens): if token in top_words: index[token].append(i) max_index = i count, _ = elias_fano[token] elias_fano[token] = (count + 1, i) def ef_size(n, u, to_int): l = to_int(math.log2(u/n)) h = to_int(math.log2(u)) - l hb_size, lb_size = n + 2**h, l*n return hb_size + hb_size*0.15 + lb_size + 64 elias_fano_c_size, elias_fano_f_size = 0, 0 for n, u in elias_fano.values(): elias_fano_c_size += ef_size(n, u, math.ceil) elias_fano_f_size += ef_size(n, u, math.floor) print(f"Elias Fano C: {(elias_fano_c_size) / 8 / 1024:.2f}KB") print(f"Elias Fano F: {(elias_fano_f_size) / 8 / 1024:.2f}KB")
Elias Fano C: 33.05KB
Elias Fano F: 30.24KB
I also reserved some additional space here, about +15% of the HighBits array for a constant-time data structure.
So it looks like we can get 20-30% compression, without losing the ability to randomly access any element in array.
So how can we build Elias-Fano compressed array? Well it is surprisingly simple if we start with a naive instead of constant-time access.
I'll put this diagram here and explain everything on it below
We start with a sorted array, shown at the top of the diagram. In this example the array length is , and the maximum value (last element) is . The binary representation of each number is shown underneath.
First, we choose the number of low bits: In this example we use and . Then we take these least significant bits from each number and store them as-is, preserving order (green path in the diagram). This forms the LowBits bitarray.
The remaining bits are the high bits: . Here .
If we keep only the most significant bits of each number, we get a sorted sequence (red path in the diagram). There are only possible values and we will refer them as buckets.
Next, each element is assigned to a bucket using its high bits, and we count how many elements fall into each bucket. These counts obviously sum to .
We then encode the bucket counts using unary encoding: a bucket with elements is written as ones followed by a zero. Concatenating all the encoded values gives as the HighBits bitvector.
Important properties of HighBits:
- It contains exactly ones (one per element)
- It contains exactly zeros (one per bucket)
- The number of bits is
- is close to since
- Usually, we should expect the number of bits to be close to
- The number of bits is if we use and if we use
These properties should help with intuition about compression
The final Elias–Fano representation consists of:
HighBits(hB)LowBits(lB)
The total size is:
In this example, that is 29 bits, compared to 35 bits for the original binary array.
Now the most important part: How to access the encoded data?
Everything is easy with LowBits array, just get i-th element from the LowBits flat array
But how to get high bits from this unary encoded array?
We will use an operation called
To get high bits we should perform
Let's now compose everything together. As a main implementation language I'll use zig since I wrote the actual implementation in zig, but I'll also provide a python version below.
We'll use std.DynamicBitSetUnmanaged for bit arrays. It provides set, unset, isSet, and an iterator over set bits - everything we need.
First, the naive - a linear scan using the bitset iterator. In the next article we will implement a data structure that can do this in constant time.
fn naiveSelect1(bitset: DynamicBitSet, k: usize) ?usize { if (k == 0) return null; var count: usize = 0; var it = bitset.iterator(.{}); while (it.next()) |i| { count += 1; if (count == k) return i; } return null; }
Now the encoding. We compute and , allocate two bitsets, then walk through the sorted data. For each value, we write its high bits in unary into hb and its low bits directly into lb:
fn init(allocator: Allocator, data: []const u64) !EliasFano { const m = data[data.len - 1]; const n = data.len; const l: u6 = @intFromFloat(@ceil( @log2(@as(f64, @floatFromInt(m)) / @as(f64, @floatFromInt(n))), )); const h: u6 = @intCast(std.math.log2_int_ceil(u64, m) - l); var hb = try DynamicBitSet.initEmpty(allocator, n + (@as(usize, 1) << h)); errdefer hb.deinit(allocator); var lb = try DynamicBitSet.initEmpty(allocator, @as(usize, l) * n); errdefer lb.deinit(allocator); var bucket: u64 = 0; var h_bits_index: usize = 0; var l_bits_index: usize = 0; for (data) |value| { // High bits: unary encoding // We write a 1 for each element in that bucket // Each time we encounter a new bucket, we write a 0 const h_bits = value >> l; while (h_bits > bucket) { h_bits_index += 1; bucket += 1; } hb.set(h_bits_index); h_bits_index += 1; // Low bits: store l least significant bits, MSB first var bit_idx: u6 = 0; while (bit_idx < l) : (bit_idx += 1) { if ((value >> (l - 1 - bit_idx)) & 1 == 1) { lb.set(l_bits_index); } l_bits_index += 1; } } return .{ .m = m, .n = n, .l = l, .h = h, .hb = hb, .lb = lb }; }
And decoding — gives us the high bits, then we read bits from the low bits array:
fn get(self: EliasFano, i: usize) !u64 { if (i >= self.n) return error.IndexOutOfBounds; const h_bits = naiveSelect1(self.hb, i + 1).? - i; var l_bits: u64 = 0; const start = i * @as(usize, self.l); for (start..(start + @as(usize, self.l))) |bit_pos| { l_bits = (l_bits << 1) | @as(u1, @bitCast(self.lb.isSet(bit_pos))); } return (h_bits << self.l) | l_bits; }
Here is the full code:
const std = @import("std"); const Allocator = std.mem.Allocator; const DynamicBitSet = std.DynamicBitSetUnmanaged; const EliasFano = struct { m: u64, n: usize, l: u6, h: u6, hb: DynamicBitSet, lb: DynamicBitSet,fn init(allocator: Allocator, data: []const u64) !EliasFano { const m = data[data.len - 1]; const n = data.len; const l: u6 = @intFromFloat(@ceil( @log2(@as(f64, @floatFromInt(m)) / @as(f64, @floatFromInt(n))), )); const h: u6 = @intCast(std.math.log2_int_ceil(u64, m) - l); var hb = try DynamicBitSet.initEmpty(allocator, n + (@as(usize, 1) << h)); errdefer hb.deinit(allocator); var lb = try DynamicBitSet.initEmpty(allocator, @as(usize, l) * n); errdefer lb.deinit(allocator); var bucket: u64 = 0; var h_bits_index: usize = 0; var l_bits_index: usize = 0; for (data) |value| { const h_bits = value >> l; while (h_bits > bucket) { h_bits_index += 1; bucket += 1; } hb.set(h_bits_index); h_bits_index += 1; var bit_idx: u6 = 0; while (bit_idx < l) : (bit_idx += 1) { if ((value >> (l - 1 - bit_idx)) & 1 == 1) { lb.set(l_bits_index); } l_bits_index += 1; } } return .{ .m = m, .n = n, .l = l, .h = h, .hb = hb, .lb = lb }; }fn deinit(self: *EliasFano, allocator: Allocator) void { self.hb.deinit(allocator); self.lb.deinit(allocator); self.* = undefined; }fn get(self: EliasFano, i: usize) !u64 { if (i >= self.n) return error.IndexOutOfBounds; const h_bits = naiveSelect1(self.hb, i + 1).? - i; var l_bits: u64 = 0; const start = i * @as(usize, self.l); for (start..(start + @as(usize, self.l))) |bit_pos| { l_bits = (l_bits << 1) | @as(u1, @bitCast(self.lb.isSet(bit_pos))); } return (h_bits << self.l) | l_bits; }fn bitsize(self: EliasFano) usize { return self.n + (@as(usize, 1) << self.h) + @as(usize, self.l) * self.n; } }; fn test_elias_fano(allocator: Allocator, size: usize, min_val: u64, max_val: u64) !void { const arr = try randomSortedArray(allocator, size, min_val, max_val); var compressed = try EliasFano.init(allocator, arr); defer compressed.deinit(allocator); for (0..arr.len) |i| { const value = try compressed.get(i); if (value != arr[i]) { std.debug.print("Compression failed, expected {d}, got {d} at index {d}\n", .{ arr[i], value, i }); return error.CompressionFailed; } } const min_size: f64 = @ceil(@log2(@as(f64, @floatFromInt(arr[arr.len - 1])))) * @as(f64, @floatFromInt(arr.len)); const compressed_size: f64 = @floatFromInt(compressed.bitsize()); const ratio = (min_size - compressed_size) / min_size; std.debug.print("Compression: {d:.2}% ({d} bits vs {d} bits)\n", .{ ratio * 100.0, compressed.bitsize(), @as(u64, @intFromFloat(min_size)), }); } pub fn main() !void { const allocator = std.heap.smp_allocator; try test_elias_fano(allocator, 100, 0, 10000); }fn naiveSelect1(bitset: DynamicBitSet, k: usize) ?usize { if (k == 0) return null; var count: usize = 0; var it = bitset.iterator(.{}); while (it.next()) |i| { count += 1; if (count == k) return i; } return null; }fn randomSortedArray(allocator: Allocator, size: usize, min_val: u64, max_val: u64) ![]u64 { var prng: std.Random.DefaultPrng = .init(blk: { var seed: u64 = undefined; try std.posix.getrandom(std.mem.asBytes(&seed)); break :blk seed; }); const rand = prng.random(); var pool = try allocator.alloc(u64, size); errdefer allocator.free(pool); for (0..size) |i| pool[i] = rand.intRangeAtMost(u64, min_val, max_val); std.sort.pdq(u64, pool, {}, comptime std.sort.asc(u64)); return pool; }
Compression: 33.71% (928 bits vs 1400 bits)
Python version
from math import log2, ceil import random # BitArray — backing store for both high and low bit arrays class BitArray: def __init__(self, size): self.size = size self.data = bytearray((size + 7) // 8) def set(self, index): self.data[index // 8] |= (1 << (index % 8)) def clear(self, index): self.data[index // 8] &= ~(1 << (index % 8)) def get(self, index): return (self.data[index // 8] >> (index % 8)) & 1 def naive_select_1(self, k): count = 0 for i in range(self.size): if self.get(i) == 1: count += 1 if count == k: return i return -1 # Elias-Fano encoding for sorted integer sequences class EliasFano: def __init__(self, data): self.m = data[-1] self.n = len(data) self.l = ceil(log2(self.m / self.n)) self.h = ceil(log2(self.m)) - self.l self.hb = BitArray(self.n + 2 ** self.h) self.lb = BitArray(self.l * self.n) bucket = 0 h_bits_index = 0 l_bits_index = 0 for value in data: h_bits = value >> self.l while h_bits > bucket: self.hb.clear(h_bits_index) h_bits_index += 1 bucket += 1 self.hb.set(h_bits_index) h_bits_index += 1 bits = [int(bit) for bit in format(value, '08b')][-self.l:] for bit in bits: if bit == 1: self.lb.set(l_bits_index) else: self.lb.clear(l_bits_index) l_bits_index += 1 def bitsize(self): return self.n + 2 ** self.h + self.l * self.n def get(self, i): h_bits = self.hb.naive_select_1(i + 1) - i bits = [self.lb.get(i) for i in range(i * self.l, (i + 1) * self.l)] l_bits = 0 for bit in bits: l_bits = (l_bits << 1) | bit return (h_bits << self.l) | l_bits # Test: encode random sorted array, verify decoding, print compression def test(size=100, min_val=0, max_val=10000): arr = random.sample(range(min_val, max_val + 1), size) arr.sort() compressed = EliasFano(arr) for i in range(len(arr)): if compressed.get(i) != arr[i]: raise ValueError(f"Failed at index {i}") min_size = ceil(log2(arr[-1])) * len(arr) compressed_size = compressed.bitsize() ratio = (min_size - compressed_size) / min_size print(f'Compression: {ratio * 100:.2f}% ({compressed_size} bits vs {min_size} bits)') test()