Locust Setup Guide

Basic locustfile.py

from locust import HttpUser, task, between class WebsiteUser(HttpUser): # Wait between 1 and 3 seconds between tasks wait_time = between(1, 3) # Host — can also be set via --host flag host = "https://api.example.com" def on_start(self): """Called when a new user starts (login, setup).""" response = self.client.post("/auth/login", json={ "username": "testuser", "password": "testpass", }) token = response.json()["token"] self.client.headers.update({"Authorization": f"Bearer {token}"}) @task(3) # weight 3 — runs 3x more than weight-1 tasks def browse_products(self): self.client.get("/products") @task(1) def view_product(self): product_id = self.client.get("/products").json()[0]["id"] self.client.get(f"/products/{product_id}", name="/products/[id]") @task(2) def search(self): self.client.get("/search?q=widget")

TaskSet — Grouped Behavior

from locust import HttpUser, TaskSet, task, between, constant class UserBrowsingBehavior(TaskSet): @task(5) def view_listing(self): self.client.get("/listings") @task(2) def view_single(self): self.client.get("/listings/1") @task(1) def create_listing(self): self.client.post("/listings", json={"title": "Test"}) def on_stop(self): """Called when this TaskSet stops.""" pass class BrowsingUser(HttpUser): tasks = [UserBrowsingBehavior] wait_time = between(0.5, 2) # Nested TaskSets class AdminBehavior(TaskSet): tasks = {UserBrowsingBehavior: 3, AdminTaskSet: 1} # wait_time options from locust import constant, constant_pacing wait_time = constant(1) # exactly 1 second wait_time = constant_pacing(2) # pace to 1 req per 2 sec wait_time = between(1, 5) # random 1-5 seconds

Running Locust

# Web UI mode (open http://localhost:8089) locust -f locustfile.py # Headless mode locust -f locustfile.py \ --headless \ --users 100 \ --spawn-rate 10 \ --run-time 2m \ --host https://api.example.com # Export CSV results locust -f locustfile.py \ --headless -u 50 -r 5 -t 1m \ --csv results/run1 \ --html results/report.html # Stop with exit code based on failures locust -f locustfile.py \ --headless -u 100 -r 10 -t 5m \ --exit-code-on-error 1 # Config file: locust.conf # headless = true # users = 100 # spawn-rate = 10

Custom Load Shape

from locust import LoadTestShape class StepLoadShape(LoadTestShape): """Ramp up in steps.""" step_time = 30 step_load = 20 spawn_rate = 10 time_limit = 300 def tick(self): run_time = self.get_run_time() if run_time > self.time_limit: return None # stop current_step = run_time // self.step_time return (current_step * self.step_load, self.spawn_rate) class DoubleWaveShape(LoadTestShape): """Two waves of load.""" stages = [ {"duration": 60, "users": 10, "spawn_rate": 5}, {"duration": 120, "users": 50, "spawn_rate": 10}, {"duration": 180, "users": 10, "spawn_rate": 5}, {"duration": 240, "users": 70, "spawn_rate": 20}, {"duration": 300, "users": 0, "spawn_rate": 10}, ] def tick(self): run_time = self.get_run_time() for stage in self.stages: if run_time < stage["duration"]: return (stage["users"], stage["spawn_rate"]) return None

Distributed Testing

# Master node (starts web UI and distributes work) locust -f locustfile.py --master --host https://api.example.com # Worker nodes (run on separate machines) locust -f locustfile.py --worker --master-host=192.168.1.100 # Multiple workers on same machine locust -f locustfile.py --worker --master-host=localhost & locust -f locustfile.py --worker --master-host=localhost & locust -f locustfile.py --worker --master-host=localhost & # Docker Compose distributed setup # master: # image: locustio/locust # ports: ["8089:8089"] # command: -f /mnt/locust/locustfile.py --master # worker: # image: locustio/locust # command: -f /mnt/locust/locustfile.py --worker --master-host master # deploy: # replicas: 4

Events & Custom Assertions

from locust import events @events.test_start.add_listener def on_test_start(environment, **kwargs): print("Load test starting!") @events.test_stop.add_listener def on_test_stop(environment, **kwargs): stats = environment.stats.total print(f"Total requests: {stats.num_requests}") print(f"Failure rate: {stats.fail_ratio:.1%}") # Fail if error rate too high if stats.fail_ratio > 0.01: environment.process_exit_code = 1 # Request event @events.request.add_listener def on_request(request_type, name, response_time, response_length, exception, **kwargs): if exception: print(f"Request failed: {name} — {exception}")