Skip to content

Commit

Permalink
Update create track (elastic#1846)
Browse files Browse the repository at this point in the history
* Add batch_size and update track layout

* Update docs

* Fix doc formatting and missing hard coded batch size

* More typos
  • Loading branch information
gareth-ellis authored May 23, 2024
1 parent 659ab11 commit 31a0eb7
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 41 deletions.
8 changes: 8 additions & 0 deletions docs/adding_tracks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,31 @@ The track generator will create a folder with the track's name in the specified

> find tracks/acme
tracks/acme
tracks/acme/challenges
tracks/acme/challenges/default.json
tracks/acme/companies-documents.json
tracks/acme/companies-documents.json.bz2
tracks/acme/companies-documents-1k.json
tracks/acme/companies-documents-1k.json.bz2
tracks/acme/companies.json
tracks/acme/operations
tracks/acme/operations/default.json
tracks/acme/products-documents.json
tracks/acme/products-documents.json.bz2
tracks/acme/products-documents-1k.json
tracks/acme/products-documents-1k.json.bz2
tracks/acme/products.json
tracks/acme/track.json


The files are organized as follows:

* ``track.json`` contains the actual Rally track. For details see the :doc:`track reference<track>`.
* ``companies.json`` and ``products.json`` contain the mapping and settings for the extracted indices.
* ``*-documents.json(.bz2)`` contains the sources of all the documents from the extracted indices. The files suffixed with ``-1k`` contain a smaller version of the document corpus to support :ref:`test mode <add_track_test_mode>`.
* ``operations/default.json`` contains a sample operation that can be used when adding search requests to the track.
* ``challenges/default.json`` contains the initial actions for the default challenge - delete index, create, and bulk-index.


Creating a track from scratch
-----------------------------
Expand Down
15 changes: 15 additions & 0 deletions docs/command_line_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,21 @@ Use index patterns::
.. note::
If the cluster requires authentication specify credentials via ``--client-options`` as described in the :ref:`command line reference <clr_client_options>`.

``batch-size``
~~~~~~~~~~~~~~~~

The number of records to request per batch when generating a track with the ``create-track`` subcommand. The default value is 1000

**Example**

Run with 10000 records per batch::

esrally create-track --track=acme --batch-size 10000 --target-hosts=127.0.0.1:9200 --output-path=~/tracks


.. note::
The larger the batch size, the more data will be downloaded per call to Elasticsearch. As such, you should ensure that you have a stable network connection between where you are running rally and the Elasticsearch cluster.

.. _command_line_reference_advanced_topics:

Advanced topics
Expand Down
9 changes: 8 additions & 1 deletion esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def add_track_source(subparser):
add_track_source(info_parser)
info_parser.add_argument(
"--track",
help=f"Define the track to use. List possible tracks with `{PROGRAM_NAME} list tracks`."
help=f"Define the track to use. List possible tracks with `{PROGRAM_NAME} list tracks`.",
# we set the default value later on because we need to determine whether the user has provided this value.
# default="geonames"
)
Expand Down Expand Up @@ -229,6 +229,11 @@ def add_track_source(subparser):
required=True,
help="Name of the generated track",
)
create_track_parser.add_argument(
"--batch-size",
default=1000,
help="Number of documents to collect per call to Elasticsearch.",
)
indices_or_data_streams_group = create_track_parser.add_mutually_exclusive_group(required=True)
indices_or_data_streams_group.add_argument(
"--indices",
Expand Down Expand Up @@ -1188,11 +1193,13 @@ def dispatch_sub_command(arg_parser, args, cfg: types.Config):
cfg.add(config.Scope.applicationOverride, "generator", "data_streams", args.data_streams)
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track)
cfg.add(config.Scope.applicationOverride, "generator", "batch_size", args.batch_size)
elif args.indices is not None:
cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices)
cfg.add(config.Scope.applicationOverride, "generator", "data_streams", args.data_streams)
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track)
cfg.add(config.Scope.applicationOverride, "generator", "batch_size", args.batch_size)
configure_connection_params(arg_parser, args, cfg)

tracker.create_track(cfg)
Expand Down
35 changes: 35 additions & 0 deletions esrally/resources/challenges.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"name": "my-challenge",
"description": "My new challenge",
"default": true,
"schedule": [
{
"operation": "delete-index"
},{% raw %}
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},{% endraw %}
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %}
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": {
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
"clients": {{bulk_indexing_clients | default(8)}}
}
]{% endraw %}
}
11 changes: 11 additions & 0 deletions esrally/resources/operations.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{% raw %}
{
"name": "default",
"operation-type": "search",
"body": {
"query": {
"match_all": {}
}
}
}
{% endraw %}
38 changes: 8 additions & 30 deletions esrally/resources/track.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,12 @@
]
}{% endfor %}
],
"schedule": [
{
"operation": "delete-index"
},{% raw %}
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},{% endraw %}
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %}
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": {
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
"clients": {{bulk_indexing_clients | default(8)}}
}
]{% endraw %}
{% raw %}
"operations": [
{{ rally.collect(parts="operations/*.json") }}
],
"challenges": [
{{ rally.collect(parts="challenges/*.json") }}
]
{% endraw %}
}
12 changes: 6 additions & 6 deletions esrally/tracker/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_doc_outpath(outdir, name, suffix=""):
return os.path.join(outdir, f"{name}-documents{suffix}.json")


def extract(client, output_path, index):
def extract(client, output_path, index, batch_size=1000):
"""
Scroll an index with a match-all query, dumping document source to ``outdir/documents.json``.
Expand All @@ -58,20 +58,20 @@ def extract(client, output_path, index):
if total_docs > 0:
logger.info("[%d] total docs in index [%s].", total_docs, index)
docs_path = get_doc_outpath(output_path, index)
dump_documents(client, index, get_doc_outpath(output_path, index, "-1k"), min(total_docs, 1000), " for test mode")
dump_documents(client, index, docs_path, total_docs)
dump_documents(client, index, get_doc_outpath(output_path, index, "-1k"), min(total_docs, 1000), batch_size, " for test mode")
dump_documents(client, index, docs_path, total_docs, batch_size)
return template_vars(index, docs_path, total_docs)
else:
logger.info("Skipping corpus extraction fo index [%s] as it contains no documents.", index)
return None


def dump_documents(client, index, out_path, total_docs, progress_message_suffix=""):
def dump_documents(client, index, out_path, total_docs, batch_size=1000, progress_message_suffix=""):
# pylint: disable=import-outside-toplevel
from elasticsearch import helpers

logger = logging.getLogger(__name__)
freq = max(1, total_docs // 1000)
freq = max(1, total_docs // batch_size)

progress = console.progress()
compressor = DOCS_COMPRESSOR()
Expand All @@ -80,7 +80,7 @@ def dump_documents(client, index, out_path, total_docs, progress_message_suffix=
with open(comp_outpath, "wb") as comp_outfile:
logger.info("Dumping corpus for index [%s] to [%s].", index, out_path)
query = {"query": {"match_all": {}}}
for n, doc in enumerate(helpers.scan(client, query=query, index=index)):
for n, doc in enumerate(helpers.scan(client, query=query, index=index, size=batch_size)):
if n >= total_docs:
break
data = (json.dumps(doc["_source"], separators=(",", ":")) + "\n").encode("utf-8")
Expand Down
17 changes: 13 additions & 4 deletions esrally/tracker/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def extract_indices_from_data_streams(client, data_streams_to_extract):
return indices


def extract_mappings_and_corpora(client, output_path, indices_to_extract):
def extract_mappings_and_corpora(client, output_path, indices_to_extract, batch_size):
indices = []
corpora = []
# first extract index metadata (which is cheap) and defer extracting data to reduce the potential for
Expand All @@ -61,7 +61,7 @@ def extract_mappings_and_corpora(client, output_path, indices_to_extract):

# That list only contains valid indices (with index patterns already resolved)
for i in indices:
c = corpus.extract(client, output_path, i["name"])
c = corpus.extract(client, output_path, i["name"], batch_size)
if c:
corpora.append(c)

Expand All @@ -77,6 +77,7 @@ def create_track(cfg: types.Config):
target_hosts = cfg.opts("client", "hosts").default
client_options = cfg.opts("client", "options").default
data_streams = cfg.opts("generator", "data_streams")
batch_size = cfg.opts("generator", "batch_size")

distribution_flavor, distribution_version, _, _ = factory.cluster_distribution_version(target_hosts, client_options)
client = factory.EsClientFactory(
Expand All @@ -90,22 +91,30 @@ def create_track(cfg: types.Config):

output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), track_name))
io.ensure_dir(output_path)
challenge_path = os.path.abspath(os.path.join(output_path, "challenges"))
io.ensure_dir(challenge_path)
operations_path = os.path.abspath(os.path.join(output_path, "operations"))
io.ensure_dir(operations_path)

if data_streams is not None:
logger.info("Creating track [%s] matching data streams [%s]", track_name, data_streams)
extracted_indices = extract_indices_from_data_streams(client, data_streams)
indices = extracted_indices
logger.info("Creating track [%s] matching indices [%s]", track_name, indices)

indices, corpora = extract_mappings_and_corpora(client, output_path, indices)
indices, corpora = extract_mappings_and_corpora(client, output_path, indices, batch_size)
if len(indices) == 0:
raise RuntimeError("Failed to extract any indices for track!")

template_vars = {"track_name": track_name, "indices": indices, "corpora": corpora}

track_path = os.path.join(output_path, "track.json")
default_challenges = os.path.join(challenge_path, "default.json")
default_operations = os.path.join(operations_path, "default.json")
templates_path = os.path.join(cfg.opts("node", "rally.root"), "resources")
process_template(templates_path, "track.json.j2", template_vars, track_path)
process_template(templates_path, "challenges.json.j2", template_vars, default_challenges)
process_template(templates_path, "operations.json.j2", template_vars, default_operations)

console.println("")
console.info(f"Track {track_name} has been created. Run it with: {PROGRAM_NAME} --track-path={output_path}")
console.info(f"Track {track_name} has been created. Run it with: {PROGRAM_NAME} race --track-path={output_path}")
1 change: 1 addition & 0 deletions esrally/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"assertions",
"async.debug",
"available.cores",
"batch_size",
"build.type",
"cache",
"cache.days",
Expand Down

0 comments on commit 31a0eb7

Please sign in to comment.