Skip to content

Commit

Permalink
feat(tpcds-benchmarking): Add basic tpcds benchmarking for local test…
Browse files Browse the repository at this point in the history
…ing (#3509)

# Overview
This PR enables TPC-DS benchmarking on your local computer.

## Usage

```sh
DAFT_RUNNER=native python -m benchmarking.tpcds --questions "3"
```
  • Loading branch information
Raunak Bhagat authored Dec 10, 2024
1 parent 092c354 commit ba46d07
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ build

**/node_modules/**
data/**
benchmarking/tpcds/data
*.so
*.whl
log/
Expand Down
Empty file added benchmarking/tpcds/__init__.py
Empty file.
189 changes: 189 additions & 0 deletions benchmarking/tpcds/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import argparse
import logging
import typing
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional

import ray

import daft

from ..tpch import __main__ as tpch
from ..tpch import ray_job_runner
from . import datagen, helpers

logger = logging.getLogger(__name__)

SQL_QUERIES_PATH = Path(__file__).parent / "queries"


@dataclass
class ParsedArgs:
tpcds_gen_folder: Path
scale_factor: float
questions: str
ray_address: Optional[str]
dry_run: bool


@dataclass
class RunArgs:
scaled_tpcds_gen_folder: Path
query_indices: list[int]
ray_address: Optional[str]
dry_run: bool


@dataclass
class Result:
index: int
duration: Optional[timedelta]
error_msg: Optional[str]

def __repr__(self) -> str:
if self.duration and self.error_msg:
typing.assert_never("Both duration and error_msg are not None")
elif self.duration:
return f"(Q{self.index} SUCCESS - duration: {self.duration})"
elif self.error_msg:
return f"(Q{self.index} FAILURE - error msg: {self.error_msg})"
else:
typing.assert_never("Both duration and error_msg are None")


def run_query_on_ray(
run_args: RunArgs,
) -> list[Result]:
ray.init(address=run_args.ray_address if run_args.ray_address else None)
results = []

for query_index in run_args.query_indices:
working_dir = Path("benchmarking") / "tpcds"
ray_entrypoint_script = "ray_entrypoint.py"
duration = None
error_msg = None
try:
start = datetime.now()
ray_job_runner.run_on_ray(
run_args.ray_address,
{
"entrypoint": f"python {ray_entrypoint_script} --tpcds-gen-folder 'data/0.01' --question {query_index} {'--dry-run' if run_args.dry_run else ''}",
"runtime_env": {
"working_dir": working_dir,
},
},
)
end = datetime.now()
duration = end - start
except Exception as e:
error_msg = str(e)

results.append(Result(index=query_index, duration=duration, error_msg=error_msg))

return results


def run_query_on_local(
run_args: RunArgs,
) -> list[Result]:
catalog = helpers.generate_catalog(run_args.scaled_tpcds_gen_folder)
results = []

for query_index in run_args.query_indices:
query_file = SQL_QUERIES_PATH / f"{query_index:02}.sql"
with open(query_file) as f:
query = f.read()

start = datetime.now()

duration = None
error_msg = None
try:
daft.sql(query, catalog=catalog).explain(show_all=True)
if not run_args.dry_run:
daft.sql(query, catalog=catalog).collect()

end = datetime.now()
duration = end - start
except Exception as e:
error_msg = str(e)

results.append(Result(index=query_index, duration=duration, error_msg=error_msg))

return results


def run_benchmarks(
run_args: RunArgs,
) -> list[Result]:
logger.info(
"Running the following questions: %s",
run_args.query_indices,
)

runner = tpch.get_daft_benchmark_runner_name()

logger.info(
"Running on the following runner: %s",
runner,
)

if runner == "ray":
return run_query_on_ray(run_args)
elif runner == "py" or runner == "native":
return run_query_on_local(run_args)
else:
typing.assert_never(runner)


def main(args: ParsedArgs):
scaled_tpcds_gen_folder = args.tpcds_gen_folder / str(args.scale_factor)
datagen.gen_tpcds(scaled_tpcds_gen_folder, args.scale_factor)
query_indices = helpers.parse_questions_str(args.questions)
results = run_benchmarks(
RunArgs(
scaled_tpcds_gen_folder=scaled_tpcds_gen_folder,
query_indices=query_indices,
ray_address=args.ray_address,
dry_run=args.dry_run,
)
)

# TODO(ronnie): improve visualization of results; simply printing them to console is not the best way...
print(f"{results=}")


if __name__ == "__main__":
logging.basicConfig(level="INFO")

parser = argparse.ArgumentParser()
parser.add_argument(
"--tpcds-gen-folder",
default="benchmarking/tpcds/data",
type=Path,
help="Path to the folder containing the TPC-DS dsdgen tool and generated data",
)
parser.add_argument("--scale-factor", default=0.01, type=float, help="Scale factor to run on in GB")
parser.add_argument("--questions", default="*", type=str, help="The questions to run")
parser.add_argument("--ray-address", type=str, help="The address of the head node of the ray cluster")
parser.add_argument(
"--dry-run",
action="store_true",
help="Whether to run in dry-run mode; if true, only the plan will be printed, but no query will be executed",
)
args = parser.parse_args()

tpcds_gen_folder: Path = args.tpcds_gen_folder
assert args.scale_factor > 0

main(
ParsedArgs(
tpcds_gen_folder=tpcds_gen_folder,
scale_factor=args.scale_factor,
questions=args.questions,
ray_address=args.ray_address,
dry_run=args.dry_run,
)
)
33 changes: 22 additions & 11 deletions benchmarking/tpcds/datagen.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,50 @@
import argparse
import logging
import os
from pathlib import Path

import duckdb

logger = logging.getLogger(__name__)


def gen_tpcds(basedir: str, scale_factor: float):
if not os.path.exists(basedir):
os.makedirs(basedir)
db = duckdb.connect(f"{basedir}/tpcds.db")
def gen_tpcds(dir: Path, scale_factor: float):
if dir.exists():
assert dir.is_dir(), "The location in which to generate the data must be a directory"
logger.info(
"The directory %s already exists; doing nothing",
dir,
)
return

dir.mkdir(parents=True, exist_ok=True)
db = duckdb.connect(database=dir / "tpcds.db")
db.sql(f"call dsdgen(sf = {scale_factor})")
for item in db.sql("show tables").fetchall():
tbl = item[0]
print(f"Exporting {tbl} to {basedir}/{tbl}.parquet")
db.sql(f"COPY {tbl} TO '{basedir}/{tbl}.parquet'")
parquet_file = dir / f"{tbl}.parquet"
print(f"Exporting {tbl} to {parquet_file}")
db.sql(f"COPY {tbl} TO '{parquet_file}'")


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--tpcds-gen-folder",
default="data/tpcds-dbgen",
type=Path,
help="Path to the folder containing the TPC-DS dsdgen tool and generated data",
)
parser.add_argument("--scale-factor", default=0.01, help="Scale factor to run on in GB", type=float)

args = parser.parse_args()
num_parts = args.scale_factor

tpcds_gen_folder: Path = args.tpcds_gen_folder
assert args.scale_factor > 0

logger.info(
"Generating data at %s with: scale_factor=%s",
args.tpcds_gen_folder,
tpcds_gen_folder,
args.scale_factor,
)

gen_tpcds(basedir=args.tpcds_gen_folder, scale_factor=args.scale_factor)
scaled_tpcds_gen_folder = tpcds_gen_folder / str(args.scale_factor)
gen_tpcds(dir=scaled_tpcds_gen_folder, scale_factor=args.scale_factor)
43 changes: 43 additions & 0 deletions benchmarking/tpcds/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from pathlib import Path

import daft
from daft.sql.sql import SQLCatalog


def generate_catalog(dir: Path):
if not dir.exists():
raise RuntimeError(f"Directory not found: {dir}")
return SQLCatalog(
tables={
file.stem: daft.read_parquet(path=str(file))
for file in dir.iterdir()
if file.is_file() and file.suffix == ".parquet"
}
)


def parse_questions_str(questions: str) -> list[int]:
if questions == "*":
return list(range(1, 100))

nums = set()
for split in filter(lambda str: str, questions.split(",")):
try:
num = int(split)
nums.add(num)
except ValueError:
ints = split.split("-")
assert (
len(ints) == 2
), f"A range must include two numbers split by a dash (i.e., '-'); instead got '{split}'"
[lower, upper] = ints
try:
lower = int(lower)
upper = int(upper)
assert lower <= upper
for index in range(lower, upper + 1):
nums.add(index)
except ValueError:
raise ValueError(f"Invalid range: {split}")

return nums
53 changes: 53 additions & 0 deletions benchmarking/tpcds/ray_entrypoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import argparse
import sys
from pathlib import Path

import helpers

import daft


def run(
parquet_folder: Path,
question: int,
dry_run: bool,
):
catalog = helpers.generate_catalog(parquet_folder)
query_file = Path(__file__).parent / "queries" / f"{question:02}.sql"
with open(query_file) as f:
query = f.read()

try:
daft.sql(query, catalog=catalog).explain(show_all=True)
if not dry_run:
daft.sql(query, catalog=catalog).collect()
except Exception as e:
print(str(e), file=sys.stderr)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--tpcds-gen-folder",
required=True,
type=Path,
help="Path to the TPC-DS data generation folder",
)
parser.add_argument(
"--question",
required=True,
type=int,
help="The TPC-DS question index to run",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Whether or not to run the query in dry-run mode; if true, only the plan will be printed out",
)
args = parser.parse_args()

tpcds_gen_folder: Path = args.tpcds_gen_folder
assert tpcds_gen_folder.exists()
assert args.question in range(1, 100)

run(args.tpcds_gen_folder, args.question, args.dry_run)

0 comments on commit ba46d07

Please sign in to comment.