Randomized and Probabilistic Algorithms
Chapter 36: Randomized and Probabilistic Algorithms
You are asked in an interview: "How do you fairly draw 5 cards from a deck of 52?" You say "randomly pick one from the remaining cards each time." The interviewer follows up: "How do you prove that every possible 5-card combination is selected with exactly equal probability?" You freeze.
Consider another scenario: a data stream arrives continuously, you don't know the total number of records in advance, and memory can only hold k records. How do you guarantee that at any stopping point, the k records in memory are a uniform random sample of all data processed so far?
Randomization is not "just add some random numbers." It is a design paradigm that can make problems intractable for deterministic algorithms (adversarial inputs, massive data, distributed coordination) simple and elegant. This chapter starts with Fisher-Yates shuffling, dives into correctness proofs of reservoir sampling, and concludes with the broad applications of randomization in engineering systems.
Level 1 ยท What You Need to Know
1.1 Fisher-Yates Shuffle Algorithm
Problem: Given an array, generate a uniformly random permutation of all its elements (each permutation equally likely).
Wrong approach (many people's first instinct):
import random
def bad_shuffle(arr):
"""Wrong shuffle! Non-uniform probabilities"""
n = len(arr)
for i in range(n):
j = random.randint(0, n - 1) # pick from [0, n-1]
arr[i], arr[j] = arr[j], arr[i]
return arr
Why is this wrong? There are n^n possible swap sequences (n choices each step, n steps), but only n! permutations. Since n^n is typically not divisible by n! (e.g., 3^3 = 27, 3! = 6, 27/6 is not an integer), some permutations must be mapped to more often than others. The probabilities are non-uniform.
Correct approach: Fisher-Yates (a.k.a. Knuth Shuffle)
import random
def fisher_yates_shuffle(arr: list) -> list:
"""Fisher-Yates shuffle
Guarantees each permutation is equally likely
Time: O(n)
Space: O(1) (in-place)
"""
n = len(arr)
for i in range(n - 1, 0, -1):
# Pick uniformly from [0, i]
j = random.randint(0, i)
arr[i], arr[j] = arr[j], arr[i]
return arr
Correctness proof (by induction):
We need to show: after the algorithm finishes, each of the n! permutations occurs with probability exactly 1/n!.
Base case: n = 1, only one permutation, probability = 1 = 1/1!. Correct.
Inductive step: Assume Fisher-Yates produces uniform permutations for n-1 elements. For n elements:
First step (i = n-1): Pick j uniformly from [0, n-1], place arr[j] at position n-1. Each element ends up in the last position with probability 1/n.
Remaining steps: Perform Fisher-Yates on the first n-1 positions. By induction hypothesis, these n-1 positions form a uniform permutation (probability 1/(n-1)! each).
Combined: Probability of any specific permutation (pi_1, ..., pi_n) = P(pi_n is last) x P(first n-1 positions are (pi_1, ..., pi_{n-1})) = 1/n x 1/(n-1)! = 1/n!.
Alternative implementation (forward direction):
def fisher_yates_forward(arr: list) -> list:
"""Fisher-Yates forward version (equivalent)"""
n = len(arr)
for i in range(n):
# Pick uniformly from [i, n-1]
j = random.randint(i, n - 1)
arr[i], arr[j] = arr[j], arr[i]
return arr
These two versions are mathematically equivalent. The key invariant: at step i, the selection range must exactly cover the undecided positions.
1.2 Reservoir Sampling
Problem: Uniformly randomly select k elements from a data stream of unknown size. Can only traverse the data once, and memory holds only k elements.
Algorithm R (Vitter 1985):
import random
def reservoir_sampling(stream, k: int) -> list:
"""Reservoir Sampling - Algorithm R
Uniformly sample k elements from a stream of unknown size
Time: O(n), where n is the total stream size
Space: O(k)
"""
reservoir = []
for i, item in enumerate(stream):
if i < k:
# First k elements go directly into reservoir
reservoir.append(item)
else:
# For the (i+1)-th element (0-indexed i):
# Replace a random reservoir element with probability k/(i+1)
j = random.randint(0, i) # uniform on [0, i]
if j < k:
reservoir[j] = item
return reservoir
Correctness proof: We need to show that after processing n elements, each element is in the reservoir with probability exactly k/n.
By induction:
Base case: When n = k, all elements are in the reservoir. Probability = k/k = 1. Correct.
Inductive step: Assume after processing n-1 elements, each is in the reservoir with probability k/(n-1).
Processing the n-th element:
- It enters the reservoir with probability k/n. Correct for the new element.
- For an element x already in the reservoir:
- x is in reservoir (probability k/(n-1) by induction)
- x is not replaced = 1 - P(n-th enters) x P(x is chosen for replacement) = 1 - (k/n) x (1/k) = 1 - 1/n = (n-1)/n
- Final probability = k/(n-1) x (n-1)/n = k/n. Correct.
Special case k = 1 (select one from stream):
def reservoir_sampling_single(stream) -> object:
"""Reservoir sampling k=1: uniformly select one element from stream"""
result = None
for i, item in enumerate(stream):
# Select current element with probability 1/(i+1)
if random.randint(0, i) == 0:
result = item
return result
Intuition: The 1st element is selected with probability 1, the 2nd with 1/2 (1st survives with 1/2), the 3rd with 1/3 (earlier ones survive with 2/3), ..., the n-th with 1/n. Every element ends up selected with probability 1/n.
1.3 Randomized Quicksort
Standard quicksort's worst case is O(n^2) (when input is sorted and pivot is the first element). Random pivot selection avoids adversarial inputs.
import random
def randomized_quicksort(arr: list, low: int = 0, high: int = None) -> None:
"""Randomized Quicksort
Expected time: O(n log n) (for any input)
Worst-case time: O(n^2) (extremely unlikely, ~1/n!)
Space: O(log n) expected stack depth
"""
if high is None:
high = len(arr) - 1
if low < high:
pivot_idx = randomized_partition(arr, low, high)
randomized_quicksort(arr, low, pivot_idx - 1)
randomized_quicksort(arr, pivot_idx + 1, high)
def randomized_partition(arr: list, low: int, high: int) -> int:
"""Randomly select pivot and partition"""
pivot_idx = random.randint(low, high)
arr[pivot_idx], arr[high] = arr[high], arr[pivot_idx]
pivot = arr[high]
i = low - 1
for j in range(low, high):
if arr[j] <= pivot:
i += 1
arr[i], arr[j] = arr[j], arr[i]
arr[i + 1], arr[high] = arr[high], arr[i + 1]
return i + 1
Why randomization works: The problem with deterministic quicksort is that an adversary can construct degenerating inputs. Randomized pivot makes it impossible for the adversary to predict our choices.
Expected complexity analysis: Let T(n) be the expected comparisons for n elements. Random pivot equally likely splits into (0, n-1), (1, n-2), ..., (n-1, 0):
T(n) = n - 1 + (1/n) x sum_{i=0}^{n-1} [T(i) + T(n-1-i)]
Solving gives T(n) = 2nH_n - 4n approximately equal to 2nln(n) approximately equal to 1.39nlog_2(n).
Randomized quicksort's expected comparisons are only about 39% more than the theoretical optimum (n*log_2(n)), and this expectation holds for any input โ no matter how malicious.
Comparison with deterministic strategies:
| Strategy | Worst case | Adversary-resistant? |
|---|---|---|
| First element | O(n^2) | No (sorted input) |
| Middle element | O(n^2) | No (constructible) |
| Median of three | O(n^2) | No (harder but constructible) |
| Random | O(n^2) but tiny probability | Yes (input-independent) |
| Median of medians | O(n log n) deterministic | Yes (but large constant) |
1.4 Basic Requirements for Random Number Generation
Uniformity: Each possible value appears with equal probability.
Common error: using rand() % n for [0, n-1]
If RAND_MAX + 1 is not divisible by n, some values get one extra corresponding rand() output. For example, RAND_MAX = 32767: rand() % 10 gives 0-7 with probability 3277/32768 and 8-9 with probability 3276/32768. The bias is small but detectable in large samples.
Correct approach: rejection sampling
import random
def uniform_random(n: int) -> int:
"""Unbiased uniform random in [0, n-1]
Principle: reject values outside the uniform range
"""
RAND_MAX = 32767 # hypothetical
limit = RAND_MAX - (RAND_MAX + 1) % n
while True:
r = random.randint(0, RAND_MAX)
if r <= limit:
return r % n
1.5 Common Errors and Pitfalls
Error 1: Wrong range in shuffle
# Wrong: j should range [0, i] not [0, n-1]
for i in range(n - 1, 0, -1):
j = random.randint(0, n - 1) # Wrong! Should be randint(0, i)
arr[i], arr[j] = arr[j], arr[i]
Error 2: Index calculation in reservoir sampling
# Note: element at index i (0-indexed) should enter with probability k/(i+1)
# If using 1-indexed, it's k/i
for i, item in enumerate(stream, start=1): # 1-indexed
if i <= k:
reservoir.append(item)
else:
j = random.randint(1, i)
if j <= k:
reservoir[j - 1] = item
Error 3: Stack overflow in randomized quicksort
For large arrays (n > 10^6), recursion depth can cause stack overflow. Solutions:
- Tail recursion optimization: always recurse on the shorter subarray first
- Switch to insertion sort when subarray is below a threshold
- Use iterative approach with explicit stack
def quicksort_safe(arr, low, high):
"""Tail-recursion optimized quicksort"""
while low < high:
pivot = randomized_partition(arr, low, high)
if pivot - low < high - pivot:
quicksort_safe(arr, low, pivot - 1)
low = pivot + 1 # tail call optimization
else:
quicksort_safe(arr, pivot + 1, high)
high = pivot - 1
Level 2 ยท How It Works Under the Hood
2.1 Monte Carlo vs Las Vegas Algorithms
Randomized algorithms fall into two major categories:
Las Vegas algorithms:
- Result is always correct
- Running time is random (but finite in expectation)
- Examples: randomized quicksort, randomized pivot quickselect
Monte Carlo algorithms:
- Running time is deterministic (or bounded)
- Result may be incorrect (error probability is controllable)
- Examples: Miller-Rabin primality test, randomized min-cut
# Las Vegas example: randomized quickselect (k-th smallest)
def randomized_select(arr: list, k: int) -> int:
"""Find the k-th smallest element
Expected time: O(n)
Worst time: O(n^2) (extremely unlikely)
Result: always correct
"""
if len(arr) == 1:
return arr[0]
pivot = random.choice(arr)
left = [x for x in arr if x < pivot]
middle = [x for x in arr if x == pivot]
right = [x for x in arr if x > pivot]
if k <= len(left):
return randomized_select(left, k)
elif k <= len(left) + len(middle):
return pivot
else:
return randomized_select(right, k - len(left) - len(middle))
# Monte Carlo example: estimating pi
def monte_carlo_pi(num_samples: int) -> float:
"""Monte Carlo estimation of pi
Randomly throw points in a unit square, count fraction
landing inside the quarter circle.
pi/4 = (points inside) / (total points)
Error: O(1/sqrt(n)) โ need 4x samples to halve error
"""
inside = 0
for _ in range(num_samples):
x = random.random()
y = random.random()
if x * x + y * y <= 1:
inside += 1
return 4 * inside / num_samples
Key differences:
| Property | Las Vegas | Monte Carlo |
|---|---|---|
| Correctness | Guaranteed | Probabilistic |
| Time | Random (finite expectation) | Deterministic/bounded |
| Interconvertible? | Yes (set time limit, restart) | Conditional (need verifier) |
| Typical use | Sorting, selection | Primality, approximation |
Important conversion: Any Las Vegas algorithm can become Monte Carlo โ set a time limit, output "don't know" on timeout. Conversely, if a Monte Carlo output can be efficiently verified, it can become Las Vegas โ run repeatedly until verification passes.
2.2 SkipList's Random Level Height
SkipList is a randomized data structure proposed by William Pugh in 1990 as a probabilistic alternative to balanced search trees.
Core idea: Each node has a randomly determined "level height." A node of height h appears in all linked lists from level 1 to level h. The higher-level "express lanes" skip many nodes, achieving O(log n) expected search time.
import random
class SkipListNode:
def __init__(self, key, level):
self.key = key
self.forward = [None] * (level + 1)
class SkipList:
"""Skip List implementation
Expected operation time: O(log n)
Expected space: O(n)
"""
MAX_LEVEL = 32
P = 0.5 # promotion probability
def __init__(self):
self.header = SkipListNode(-float('inf'), self.MAX_LEVEL)
self.level = 0
def random_level(self) -> int:
"""Randomly determine new node's level height
Each level promotes with probability P. Expected height = 1/(1-P)
With P = 0.5, expected height = 2
"""
lvl = 0
while random.random() < self.P and lvl < self.MAX_LEVEL:
lvl += 1
return lvl
def search(self, key) -> bool:
"""Search for key"""
current = self.header
for i in range(self.level, -1, -1):
while current.forward[i] and current.forward[i].key < key:
current = current.forward[i]
current = current.forward[0]
return current is not None and current.key == key
def insert(self, key) -> None:
"""Insert key"""
update = [None] * (self.MAX_LEVEL + 1)
current = self.header
for i in range(self.level, -1, -1):
while current.forward[i] and current.forward[i].key < key:
current = current.forward[i]
update[i] = current
new_level = self.random_level()
if new_level > self.level:
for i in range(self.level + 1, new_level + 1):
update[i] = self.header
self.level = new_level
new_node = SkipListNode(key, new_level)
for i in range(new_level + 1):
new_node.forward[i] = update[i].forward[i]
update[i].forward[i] = new_node
Why P = 0.5 is a good choice:
- Probability of height h is P^(h-1) * (1-P) = (1/2)^h (geometric distribution)
- Expected height = 1/(1-P) = 2 (when P=0.5)
- Expected maximum height among n nodes = log_{1/P}(n) = log_2(n)
- Search time: expected 1/P = 2 steps per level, log_2(n) levels, total O(log n)
P value trade-offs:
| P | Expected height | Space multiplier | Search comparisons |
|---|---|---|---|
| 1/2 | 2 | 2n | log_2(n) + 2 |
| 1/4 | 4/3 | (4/3)n | log_2(n)/2 + 4 |
| 1/e | ~1.58 | 1.58n | Optimal (but marginal) |
In practice, P = 0.5 or P = 0.25 are both common. Redis's sorted sets (zset) use P = 0.25, MAX_LEVEL = 32.
SkipList vs Balanced BST engineering trade-offs:
| Property | SkipList | Red-Black/AVL |
|---|---|---|
| Implementation complexity | Simple | Complex (rotations) |
| Concurrency-friendly | Good (fine-grained locking) | Poor (rotations affect many nodes) |
| Range queries | Natural | Requires in-order traversal |
| Cache-friendliness | Poor (pointer chasing) | Also poor (slightly better) |
| Worst case | O(n) (tiny probability) | O(log n) deterministic |
2.3 Randomization in Load Balancing
Problem: Given n servers and m arriving requests, how to distribute requests for balanced load?
Method 1: Pure random assignment
Each request independently picks a server uniformly at random. By the "balls into bins" model, when m = n, the most loaded server receives an expected Theta(log n / log log n) requests.
Method 2: Power of Two Choices (Mitzenmacher et al., 2001)
Each request randomly picks 2 servers, then goes to the one with lighter current load. This simple improvement reduces maximum load from Theta(log n / log log n) to Theta(log log n) โ an exponential improvement!
import random
class PowerOfTwoChoices:
"""Power of Two Choices load balancing
Maximum load drops from O(log n / log log n) to O(log log n)
"""
def __init__(self, num_servers: int):
self.loads = [0] * num_servers
self.n = num_servers
def assign_request(self) -> int:
"""Assign one request, return chosen server index"""
s1 = random.randint(0, self.n - 1)
s2 = random.randint(0, self.n - 1)
while s2 == s1:
s2 = random.randint(0, self.n - 1)
if self.loads[s1] <= self.loads[s2]:
self.loads[s1] += 1
return s1
else:
self.loads[s2] += 1
return s2
Why does 2 choices make such a big difference?
Intuition: With pure random, once a server is "ahead" (above average load), it accumulates new requests at the same rate as others. But with Power of Two Choices, heavily loaded servers are less likely to be chosen (at least one of two random picks is likely lighter), creating automatic negative feedback.
Mathematically, this is an instance of exponential tail decay: each additional "choice dimension" causes extreme values' tail probabilities to decay exponentially.
Method 3: Consistent Hashing
When the same key must always route to the same server (e.g., caching):
import hashlib
import bisect
class ConsistentHashing:
"""Consistent Hashing
Supports dynamic server addition/removal
Each change affects only O(1/n) of keys
Uses virtual nodes for load uniformity
"""
def __init__(self, num_virtual_nodes: int = 150):
self.num_virtual = num_virtual_nodes
self.ring = []
self.ring_map = {}
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_server(self, server: str) -> None:
for i in range(self.num_virtual):
h = self._hash(f"{server}#{i}")
bisect.insort(self.ring, h)
self.ring_map[h] = server
def get_server(self, key: str) -> str:
if not self.ring:
return None
h = self._hash(key)
idx = bisect.bisect_left(self.ring, h) % len(self.ring)
return self.ring_map[self.ring[idx]]
2.4 Randomized Skip: From O(n) to O(sqrt(n))
An interesting technique: in reservoir sampling, most elements are rejected. Can we skip the elements that are destined to be rejected?
Vitter's Algorithm Z (1985): Instead of checking one by one, directly compute the position of the "next selected element," thereby skipping all elements in between.
import random
import math
def reservoir_sampling_optimized(stream_size: int, k: int):
"""Simplified version of Vitter's Algorithm Z
Key optimization: compute next replacement position, skip intermediate elements
For large streams (n >> k), this significantly reduces random number generations
"""
reservoir = list(range(k))
W = math.exp(math.log(random.random()) / k)
next_pos = k + int(math.floor(math.log(random.random()) / math.log(1 - W))) + 1
i = k
while i < stream_size:
if i == next_pos:
reservoir[random.randint(0, k - 1)] = i
W *= math.exp(math.log(random.random()) / k)
next_pos = i + int(math.floor(math.log(random.random()) / math.log(1 - W))) + 1
i += 1
return reservoir
When n >> k, Algorithm Z needs only O(k * (1 + log(n/k))) random number generations, versus O(n) for Algorithm R.
2.5 Derandomization of Randomized Algorithms
An interesting theoretical question: can randomized algorithms be "derandomized" โ converted to deterministic algorithms without losing efficiency?
Method of Conditional Expectations:
If a randomized algorithm's expected result satisfies some property (e.g., expected solution >= OPT/2), we can greedily fix random bits one by one, ensuring the conditional expectation never decreases, ultimately achieving the expected value deterministically.
# Example: MAX-SAT randomized -> derandomized
# Randomized: each variable independently True/False with prob 1/2
# Expected satisfied clauses = m/2
# Derandomized: for each variable, choose the value that maximizes
# conditional expected satisfied clause count
def derandomized_maxsat(clauses: list, num_vars: int) -> list:
"""Derandomized MAX-SAT approximation
Guarantees satisfying at least m/2 clauses
Time: O(n * m)
"""
assignment = [None] * (num_vars + 1)
for var in range(1, num_vars + 1):
exp_true = conditional_expected_satisfied(clauses, assignment, var, True)
exp_false = conditional_expected_satisfied(clauses, assignment, var, False)
assignment[var] = exp_true >= exp_false
return assignment
def conditional_expected_satisfied(clauses, assignment, var, value):
"""Compute conditional expectation with fixed variables + current decision + rest random"""
assignment[var] = value
expected = 0
for clause in clauses:
satisfied = False
undetermined_count = 0
for literal in clause:
v = abs(literal)
if assignment[v] is not None:
if (literal > 0 and assignment[v]) or (literal < 0 and not assignment[v]):
satisfied = True
break
else:
undetermined_count += 1
if satisfied:
expected += 1
elif undetermined_count > 0:
expected += 1 - (0.5 ** undetermined_count)
assignment[var] = None
return expected
2.6 Randomness in Hash Functions
Universal Hashing (Carter & Wegman, 1979):
Randomly select a hash function from a family, guaranteeing that for any two distinct keys, the collision probability is at most 1/m (m is table size).
import random
class UniversalHash:
"""Universal hash family
h(x) = ((a*x + b) mod p) mod m
where p is a large prime, a in [1, p-1], b in [0, p-1] chosen randomly
Collision guarantee: for any x != y, P[h(x) = h(y)] <= 1/m
"""
def __init__(self, m: int, p: int = (1 << 61) - 1):
self.m = m
self.p = p # Mersenne prime 2^61 - 1
self.a = random.randint(1, p - 1)
self.b = random.randint(0, p - 1)
def hash(self, x: int) -> int:
return ((self.a * x + self.b) % self.p) % self.m
Why universal hashing matters: A fixed hash function (like hash(x) = x % m) always has adversarial inputs that cause all keys to collide. But with a randomly chosen function from the family, the adversary cannot predict which function was selected, so adversarial inputs are impossible. This makes the hash table's worst-case expectation O(1).
Level 3 ยท What the Specifications Say
3.1 Fisher and Yates' Original Method (1938)
Ronald A. Fisher and Frank Yates first described this algorithm in their 1938 book Statistical Tables for Biological, Agricultural and Medical Research. Their original method was designed for manual execution:
- Write down numbers 1 to n
- Generate a random number in [1, k] (k is the count of remaining numbers)
- Strike out the k-th remaining number, write it in the result sequence
- Repeat until all numbers are struck out
This "strike-out" version has O(n^2) time complexity (finding the k-th remaining number takes O(n) each time).
Knuth's improvement (1969): Donald Knuth gave the now widely-used O(n) in-place version in The Art of Computer Programming Volume 2 โ using swaps instead of "striking out." He describes this as Algorithm P (Section 3.4.2) and proves its correctness. This is what we now call the Fisher-Yates-Knuth shuffle.
Durstenfeld (1964) actually independently published the in-place version earlier (in Communications of the ACM), but Knuth's textbook popularized it.
Formal correctness requirement:
A shuffle algorithm is "uniform" if and only if for every permutation pi among the n! possibilities, the algorithm outputs pi with probability exactly 1/n!.
Fisher-Yates satisfies this because:
- The algorithm makes n-1 independent choices
- Choice i has (n-i+1) options (picking from [0, n-i])
- Total choice sequences = n x (n-1) x ... x 2 x 1 = n!
- Each choice sequence is equally likely (each 1/n!)
- Different choice sequences produce different permutations (bijection)
3.2 Knuth Shuffle Implementation Details
Knuth's Algorithm P from TAOCP Vol. 2 (1969):
Algorithm P (Shuffling).
Given a sequence of n elements a[0], a[1], ..., a[n-1]:
P1. Set j <- n - 1.
P2. Generate a uniform random number U between 0 and 1.
Set k <- floor((j+1)*U). (Now k is uniform on {0, 1, ..., j}.)
P3. Exchange a[k] <-> a[j].
P4. Decrease j by 1. If j > 0, go to P2. Otherwise, terminate.
Knuth noted these key properties:
-
Each permutation corresponds to exactly one choice sequence: (k_{n-1}, k_{n-2}, ..., k_1) where 0 <= k_j <= j. This is a representation in the "factorial number system."
-
Coupling with the random number generator: The algorithm's correctness depends on U being truly uniform. If using a PRNG, its period must be at least n!. For 52 cards (52! is approximately 8x10^67), at least 226 bits of PRNG state are needed โ Mersenne Twister (period 2^19937) is sufficient, but a simple linear congruential generator (32-bit state, period 2^32) is far from enough.
3.3 Vitter's 1985 Reservoir Sampling Paper
Jeffrey S. Vitter published the classic paper "Random Sampling with a Reservoir" (ACM Transactions on Mathematical Software, 1985), systematically analyzing multiple reservoir sampling algorithms:
Algorithm R (simplest version):
- First k elements enter directly
- Element i (i > k) replaces a random reservoir element with probability k/i
- Each element requires one random number generation
- Total: n-k random number generations
Algorithm X:
- Compute the "skip distance" to the next replacement
- Skip distance S satisfies P(S > s) = product_{j=0}^{s} (1 - k/(t+1+j)), where t is current position
- Needs O(k(1+log(n/k))) random number generations
- But computing S itself takes O(S) time
Algorithm Z (optimal):
- Uses rejection sampling to efficiently generate skip distances
- Key insight: when n >> k, skip distance approximately follows a distribution that can be quickly sampled
- Expected time O(k(1+log(n/k)))
- Expected random number generations also O(k(1+log(n/k)))
Vitter's contribution was not just the algorithms but the rigorous probabilistic analysis. He proved Algorithm Z is optimal (in the comparison model, uniform sampling cannot be done with fewer random bits).
Weighted reservoir sampling variant:
A-Res algorithm (Efraimidis & Spirakis, 2006): Each element i has weight w_i, selected with probability proportional to weight.
import heapq
import random
def weighted_reservoir_sampling(stream, k: int) -> list:
"""Weighted Reservoir Sampling (A-Res algorithm)
Each element (item, weight) is selected proportional to weight
"""
heap = [] # (key, item)
for item, weight in stream:
# key = random^(1/weight) โ larger weight means key more likely large
key = random.random() ** (1.0 / weight)
if len(heap) < k:
heapq.heappush(heap, (key, item))
elif key > heap[0][0]:
heapq.heapreplace(heap, (key, item))
return [item for _, item in heap]
3.4 Computational Complexity Classification of Randomized Algorithms
BPP (Bounded-Error Probabilistic Polynomial Time):
A decision problem is in BPP if there exists a probabilistic polynomial-time algorithm such that for any input:
- If answer is YES, algorithm outputs YES with probability >= 2/3
- If answer is NO, algorithm outputs NO with probability >= 2/3
(2/3 can be amplified to 1 - 2^(-k) via repeated runs + majority vote)
RP (Randomized Polynomial Time):
- If answer is YES, algorithm outputs YES with probability >= 1/2
- If answer is NO, algorithm always outputs NO (no false positives)
ZPP (Zero-Error Probabilistic Polynomial Time):
- Algorithm always gives the correct answer
- Expected running time is polynomial
- ZPP = RP intersect coRP
Relationships: P is a subset of ZPP, which is a subset of RP, which is a subset of BPP, which is a subset of PSPACE.
It is widely conjectured that P = BPP (all efficient randomized algorithms can be derandomized), but this remains unproven. Impagliazzo & Wigderson (1997) showed: if certain hardness assumptions hold (e.g., E = DTIME(2^O(n)) contains problems with exponential circuit complexity), then P = BPP.
3.5 Theoretical Foundations of Pseudorandom Number Generators
Cryptographically Secure PRNG (CSPRNG) requirements:
- Given the first k output bits, the (k+1)-th bit cannot be predicted in polynomial time
- Given the internal state, previous outputs cannot be recovered (forward secrecy)
Common PRNG comparison:
| PRNG | Period | Security | Speed | Use case |
|---|---|---|---|---|
| LCG (linear congruential) | 2^32 | Insecure | Very fast | Not recommended |
| Mersenne Twister | 2^19937 | Insecure (reversible) | Fast | Scientific computing |
| xorshift128+ | 2^128 | Insecure | Very fast | Games, non-security |
| ChaCha20 | 2^256 | Crypto-secure | Medium | Security scenarios |
| /dev/urandom | Implementation-dependent | Secure | Slow | Key generation |
Python random number sources:
randommodule: Mersenne Twister, not suitable for securitysecretsmodule: OS-level CSPRNG, suitable for securityos.urandom(n): directly gets n bytes of secure random data
Level 4 ยท Edge Cases and Interview Problems
4.1 Interview Problem: Shuffle an Array (LeetCode #384)
Problem: Given an integer array nums, design an algorithm to randomly shuffle the array. Implement the Solution class:
Solution(int[] nums)initializes the objectint[] reset()resets to initial stateint[] shuffle()returns randomly shuffled result
import random
class Solution:
"""LeetCode 384: Shuffle an Array
Fisher-Yates for guaranteed uniformity
"""
def __init__(self, nums: list):
self.original = nums[:]
self.current = nums[:]
def reset(self) -> list:
self.current = self.original[:]
return self.current
def shuffle(self) -> list:
n = len(self.current)
for i in range(n - 1, 0, -1):
j = random.randint(0, i)
self.current[i], self.current[j] = self.current[j], self.current[i]
return self.current
Interview follow-ups:
-
"How would you verify your shuffle is uniform?"
- For small arrays (e.g., [1,2,3]), run a million times, count each permutation's frequency; should be close to 1/n! = 1/6 approximately 16.67%
- Chi-squared test quantifies the deviation
-
"If the array is huge (10^8 elements), how to shuffle only the first k?"
- Execute only the first k steps of Fisher-Yates for k uniformly random elements
- This is essentially "randomly select k from n"
-
"What if you cannot modify the original array?"
- Use O(n) extra space to copy then shuffle
- Or use inside-out version (no in-place modification needed)
def inside_out_shuffle(original: list) -> list:
"""Inside-out Fisher-Yates (doesn't modify original)"""
n = len(original)
result = [0] * n
for i in range(n):
j = random.randint(0, i)
if j != i:
result[i] = result[j]
result[j] = original[i]
return result
4.2 Interview Problem: Linked List Random Node (LeetCode #382)
Problem: Given a singly linked list, return a random node's value. Each node must have the same probability of being chosen.
Analysis: Direct application of reservoir sampling with k=1. The list length is unknown (or we don't want to traverse twice), and we need to complete in one pass.
import random
class ListNode:
def __init__(self, val=0, next=None):
self.val = val
self.next = next
class Solution:
"""LeetCode 382: Linked List Random Node
Reservoir sampling k=1
"""
def __init__(self, head: ListNode):
self.head = head
def getRandom(self) -> int:
result = self.head.val
node = self.head.next
i = 2 # currently at 2nd node
while node:
# Select current node with probability 1/i
if random.randint(1, i) == 1:
result = node.val
node = node.next
i += 1
return result
Correctness verification (3 nodes: A -> B -> C):
- A selected: step 2 doesn't pick B (prob 1/2) AND step 3 doesn't pick C (prob 2/3) = 1 x 1/2 x 2/3 = 1/3
- B selected: step 2 picks B (prob 1/2) AND step 3 doesn't pick C (prob 2/3) = 1/2 x 2/3 = 1/3
- C selected: step 3 picks C (prob 1/3) = 1/3
All 1/3. Correct.
Interview follow-ups:
- "What if you need k random nodes?" โ Use reservoir sampling with k > 1
- "What if you can know the list length?" โ Count length n first, generate random [0, n-1], walk to that position
- "What if getRandom is called very frequently?" โ First traversal stores all values in an array, then O(1) random access
4.3 Interview Problem: Random Pick Index (LeetCode #398)
Problem: Given an integer array that may contain duplicates, randomly output the index of a given target number. Each index should be returned with equal probability.
import random
class Solution:
"""LeetCode 398: Random Pick Index
Method 1: Reservoir sampling (O(n) time, O(1) extra space)
Method 2: Preprocess hash map (O(1) query, O(n) space)
"""
def __init__(self, nums: list):
self.nums = nums
def pick(self, target: int) -> int:
"""Reservoir sampling method"""
result = -1
count = 0
for i, num in enumerate(self.nums):
if num == target:
count += 1
if random.randint(1, count) == 1:
result = i
return result
class SolutionOptimized:
"""Preprocessed version โ suitable for many queries"""
def __init__(self, nums: list):
from collections import defaultdict
self.indices = defaultdict(list)
for i, num in enumerate(nums):
self.indices[num].append(i)
def pick(self, target: int) -> int:
return random.choice(self.indices[target])
Method selection:
- Few pick calls (e.g., once): reservoir sampling, O(1) init + O(n) per query
- Many pick calls: preprocess hash map, O(n) init + O(1) per query
Interview discussion points:
- Reservoir sampling advantage: no extra space, suitable for streaming data
- Hash map advantage: O(1) query
- Follow-up: what if the array changes dynamically (insert/delete)?
4.4 Testing and Verifying Randomized Algorithms
How do you gain confidence that your randomized algorithm is correct? This is much harder than for deterministic algorithms.
Method 1: Statistical testing
from collections import Counter
import math
def verify_shuffle(shuffle_func, arr, num_trials=1000000):
"""Verify shuffle uniformity"""
n = len(arr)
expected_count = num_trials / math.factorial(n)
counts = Counter()
for _ in range(num_trials):
result = shuffle_func(arr[:])
counts[tuple(result)] += 1
# Chi-squared test
observed = list(counts.values())
# Fill missing permutations with 0
while len(observed) < math.factorial(n):
observed.append(0)
# All values should be close to expected_count
max_deviation = max(abs(o - expected_count) / expected_count for o in observed)
print(f"Max relative deviation: {max_deviation:.4f}")
print(f"Uniformity: {'PASS' if max_deviation < 0.05 else 'FAIL'}")
Method 2: Invariant checking
def verify_reservoir_sampling(sample_func, stream_size, k, num_trials=100000):
"""Verify reservoir sampling uniformity"""
counts = [0] * stream_size
for _ in range(num_trials):
sample = sample_func(range(stream_size), k)
for idx in sample:
counts[idx] += 1
expected = num_trials * k / stream_size
max_deviation = max(abs(c - expected) / expected for c in counts)
print(f"Max relative deviation: {max_deviation:.4f}")
print(f"Uniformity: {'PASS' if max_deviation < 0.05 else 'FAIL'}")
4.5 Randomization in Practical Engineering
1. Randomization in databases:
- MySQL's
ORDER BY RAND()does a full table scan + sort (O(n log n)) โ very slow - Better: use ID range + random offset, or pre-generate random ID list
2. Randomization in distributed systems:
- Exponential Backoff: random wait time on network retries, avoiding thundering herd
- Jitter: add random offset to fixed intervals, preventing simultaneous timer fires
import random
def exponential_backoff_with_jitter(attempt: int, base_delay: float = 1.0,
max_delay: float = 60.0) -> float:
"""Exponential backoff with jitter
Used in distributed system retry logic
"""
# Full Jitter (recommended by AWS)
delay = min(max_delay, base_delay * (2 ** attempt))
return random.uniform(0, delay)
3. Randomization in A/B testing:
- Hash of user ID determines group (deterministic routing, same user always same group)
- Hash function choice affects group uniformity
4. Multiple hashes in Bloom Filters:
- k independent hash functions -> false positive rate = (1 - e^(-kn/m))^k
- Optimal k = (m/n) * ln 2
4.6 Interview Response Strategy
When encountering randomization problems in interviews:
-
Identify the problem type:
- "Equal probability" / "uniformly random" -> Fisher-Yates / reservoir sampling
- "Data stream" / "unknown size" -> reservoir sampling
- "Probability proportional to weight" -> weighted reservoir / roulette selection
- "Avoid worst case" -> randomized pivot / universal hashing
-
Standard pattern for proving correctness:
- For shuffles: prove each permutation has probability = 1/n!
- For sampling: prove each element is selected with probability = k/n
- Usually by mathematical induction
-
Space-time trade-offs:
- Reservoir sampling: O(k) space, O(n) time
- Preprocessing: O(n) space, O(1) query time
- Choice depends on query frequency and memory constraints
-
Common follow-ups:
- "How to verify uniformity?" -> Statistical testing (chi-squared)
- "What if the RNG is biased?" -> Rejection sampling
- "How to optimize for very large streams?" -> Vitter's Algorithm Z (skip-based sampling)