# "=== benchmark_mongodb3.1.py ===" # author: opensource@dwaves.de # date-creation: 2025-03-10 # date-last-test: 2025-03-10 # licence gpl 3.0 ## description: as-simple-as-possible python based mongodb performance meassuring tool: # xts + xtc when migrating gam partition perform fast on some systems and slow on other systems # to rule out that xts or xtc is the problem, this independent program tests mongodb performance via python # how fast or slow can write (insert) change (update) read (read) delete operations work? ## usage? # apt -y install python3-pymongo; # install requirements # time /usr/bin/python3 /scripts/benchmark_mongodb3.1.py; # run it like import json import time import pymongo import platform import subprocess from datetime import datetime def get_system_info(): python_version = platform.python_version() os_version = platform.system() + " " + platform.release() kernel_version = platform.version() try: mongo_version = subprocess.check_output(["mongod", "--version"]).decode().split("\n")[0] except Exception: mongo_version = "Could not determine MongoDB version" print(f"Python Version: {python_version}") print(f"OS Version: {os_version}") print(f"Kernel Version: {kernel_version}") print(f"MongoDB Version: {mongo_version}\n") def benchmark_mongodb(): client = pymongo.MongoClient("mongodb://localhost:27017/") db_name = datetime.now().strftime("benchmark_%Y%m%d_%H%M%S") db = client[db_name] collection = db["benchmark_collection"] def measure_time(func, *args, **kwargs): start = time.time() func(*args, **kwargs) elapsed_time = (time.time() - start) # Time in seconds return elapsed_time * 1000, elapsed_time # Convert to ms, return seconds too def insert_records(n): docs = [{"test": f"this is record number #{i} of the basic python based mongodb benchmark"} for i in range(n)] collection.insert_many(docs) if n > 1 else collection.insert_one(docs[0]) def update_records(n): collection.update_many({}, {"$set": {"test": "record number #23212 was changed"}}) def delete_records(n): collection.delete_many({}) batch_sizes = [1, 10, 100, 10000, 1000000] insert_times = [measure_time(insert_records, n) for n in batch_sizes] update_times = [measure_time(update_records, n) for n in batch_sizes] delete_times = [measure_time(delete_records, n) for n in batch_sizes] avg_insert_rate = sum(batch_sizes) / (sum(t[1] for t in insert_times)) avg_update_rate = sum(batch_sizes) / (sum(t[1] for t in update_times)) print("Benchmark Results:") for i, n in enumerate(batch_sizes): print(f"It took {insert_times[i][1]:.2f} seconds to insert {n} records") print(f"It took {update_times[i][1]:.2f} seconds to update {n} records") print(f"It took {delete_times[i][1]:.2f} seconds to delete {n} records\n") print(f"Avg Insert Rate: {avg_insert_rate:.2f} records/sec") print(f"Avg Update Rate: {avg_update_rate:.2f} records/sec") # Show all databases before deletion print("show all databases, before deleting test database "+db_name) print(json.dumps(client.list_database_names())) # Delete the database after the benchmark client.drop_database(db_name) print(f"Database {db_name} has been deleted.") if __name__ == "__main__": get_system_info() benchmark_mongodb()