Skip to content

Commit

Permalink
Update dask "compute" benchmark (#50)
Browse files Browse the repository at this point in the history
* update dask benchmark to use aggregator API

* rename file

* update comment
  • Loading branch information
rjzamora authored Jan 25, 2024
1 parent 06afeee commit 1ee3de4
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions examples/dask_compute_bench.py → examples/dask_aggregate_bench.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2023 NVIDIA CORPORATION
# Copyright 2023-2024 NVIDIA CORPORATION
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -18,19 +18,19 @@
from dask_cuda import LocalCUDACluster
from distributed import Client, LocalCluster

from crossfit.calculate.frame import MetricFrame
from crossfit.dask.calculate import calculate_per_col as calculate_dask
from crossfit.stats.continuous.stats import ContinuousStats
import crossfit as cf
from crossfit.backend.dask.aggregate import aggregate
from crossfit.metric.continuous.moments import Moments

# Benchmark assumes Criteo dataset.
# Low-cardinality columns:
# {C6:4, C9:64, C13:11, C16:155, C17:4, C19:15, C25:109, C26:37}

# Options
path = "/raid/dask-space/criteo/crit_pq_int"
path = "/datasets/rzamora/crit_pq_int"
backend = "cudf"
split_row_groups = 10
ncolumns = 10
blocksize = "265MiB"
ncolumns = 4
groupby = None
use_cluster = True

Expand All @@ -55,21 +55,19 @@
columns += groupby if isinstance(groupby, list) else [groupby]
ddf = dd.read_parquet(
path,
split_row_groups=split_row_groups,
blocksize=blocksize,
columns=columns,
)
print(f"\nddf: {ddf}\n")

# Calculate continuous stats
metric = ContinuousStats()
# Aggregate moments (mean, var, std)
agg = cf.Aggregator(Moments(axis=0), per_column=True)
t0 = time.time()
mf: MetricFrame = calculate_dask(metric, ddf, groupby=groupby)
result = aggregate(ddf, agg, to_frame=True)
tf = time.time()
print(f"\nWall Time: {tf-t0} seconds\n")

# View result
assert isinstance(mf, MetricFrame)
result = mf.result()
print(f"Result:\n{result}\n")
print(f"Type: {type(result)}\n")

Expand Down

0 comments on commit 1ee3de4

Please sign in to comment.