From 1ee3de4af3aa543ded1041c2231ad01476b33103 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Thu, 25 Jan 2024 12:35:18 -0600 Subject: [PATCH] Update dask "compute" benchmark (#50) * update dask benchmark to use aggregator API * rename file * update comment --- ...mpute_bench.py => dask_aggregate_bench.py} | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) rename examples/{dask_compute_bench.py => dask_aggregate_bench.py} (80%) diff --git a/examples/dask_compute_bench.py b/examples/dask_aggregate_bench.py similarity index 80% rename from examples/dask_compute_bench.py rename to examples/dask_aggregate_bench.py index ac90aa70..472dbd84 100644 --- a/examples/dask_compute_bench.py +++ b/examples/dask_aggregate_bench.py @@ -1,4 +1,4 @@ -# Copyright 2023 NVIDIA CORPORATION +# Copyright 2023-2024 NVIDIA CORPORATION # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,19 +18,19 @@ from dask_cuda import LocalCUDACluster from distributed import Client, LocalCluster -from crossfit.calculate.frame import MetricFrame -from crossfit.dask.calculate import calculate_per_col as calculate_dask -from crossfit.stats.continuous.stats import ContinuousStats +import crossfit as cf +from crossfit.backend.dask.aggregate import aggregate +from crossfit.metric.continuous.moments import Moments # Benchmark assumes Criteo dataset. # Low-cardinality columns: # {C6:4, C9:64, C13:11, C16:155, C17:4, C19:15, C25:109, C26:37} # Options -path = "/raid/dask-space/criteo/crit_pq_int" +path = "/datasets/rzamora/crit_pq_int" backend = "cudf" -split_row_groups = 10 -ncolumns = 10 +blocksize = "265MiB" +ncolumns = 4 groupby = None use_cluster = True @@ -55,21 +55,19 @@ columns += groupby if isinstance(groupby, list) else [groupby] ddf = dd.read_parquet( path, - split_row_groups=split_row_groups, + blocksize=blocksize, columns=columns, ) print(f"\nddf: {ddf}\n") - # Calculate continuous stats - metric = ContinuousStats() + # Aggregate moments (mean, var, std) + agg = cf.Aggregator(Moments(axis=0), per_column=True) t0 = time.time() - mf: MetricFrame = calculate_dask(metric, ddf, groupby=groupby) + result = aggregate(ddf, agg, to_frame=True) tf = time.time() print(f"\nWall Time: {tf-t0} seconds\n") # View result - assert isinstance(mf, MetricFrame) - result = mf.result() print(f"Result:\n{result}\n") print(f"Type: {type(result)}\n")