From 7aa03779efa65d471dc2403bd6d05117f2c24faa Mon Sep 17 00:00:00 2001 From: Arnaud Parant Date: Tue, 20 Feb 2024 14:56:10 +0100 Subject: [PATCH] Add test_offline_sel + Improve README and scripts/elastic.py --- README.md | 8 ++--- scripts/elastic.py | 67 +++++++++++++++++++++++++++-------- tests/docker-compose.yml | 2 +- tests/test_offline_sel.py | 28 +++++++++++++++ tests/test_query_generator.py | 6 +++- tests/test_sel.py | 6 +++- 6 files changed, 96 insertions(+), 21 deletions(-) create mode 100644 tests/test_offline_sel.py diff --git a/README.md b/README.md index d1f513f..19a3a5e 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Two first digits of SEL version match Elasticsearch version and then it's the in ## Compagny -SEL was initially developed for Heuritech in 2016 and used by everybody inside the compagny tech and no-tech people since that time to explore internal data, generate reports and analysis. +SEL was initially developed for Heuritech in 2015 and used by everybody inside the compagny tech and no-tech people since that time to explore internal data, generate reports and analysis. ## Quickstart @@ -35,7 +35,7 @@ from sel.sel import SEL es = Elasticsearch(hosts="http://elasticsearch") sel = SEL(es) -sel.search("my_index", "label = bag") +sel.search("my_index", {"query": "label = bag"}) ``` ### SEL as ES query generator @@ -45,7 +45,7 @@ from sel.sel import SEL es = Elasticsearch(hosts="http://elasticsearch") sel = SEL(es) -sel.generate_query("label = bag", index="my_index")["elastic_query"] +sel.generate_query({"query": "label = bag"}, index="my_index")["elastic_query"] ``` ### SEL as offline ES query generator @@ -53,7 +53,7 @@ sel.generate_query("label = bag", index="my_index")["elastic_query"] from sel.sel import SEL sel = SEL(None) -sel.generate_query("label = bag", schema=my_index_schema)["elastic_query"] +sel.generate_query({"query": "label = bag"}, schema=my_index_schema)["elastic_query"] ``` ### SEL as API (SEL Server) diff --git a/scripts/elastic.py b/scripts/elastic.py index f5fe362..e28149b 100755 --- a/scripts/elastic.py +++ b/scripts/elastic.py @@ -1,13 +1,13 @@ -import os +#!/usr/bin/python3 + import json import argparse import logging import time +from itertools import islice from elasticsearch import Elasticsearch from elasticsearch.client import _normalize_hosts -from sel import upload - def options(): parser = argparse.ArgumentParser() @@ -15,11 +15,12 @@ def options(): parser.add_argument("schema_filepath") parser.add_argument("index_name") parser.add_argument("--overwrite", action="store_true") + parser.add_argument("--hosts", nargs='+') return parser.parse_args() -def create_index(filepath, schema_filepath, index, overwrite=False): - elastic = elastic_connect() +def create_index(filepath, schema_filepath, index, overwrite=False, hosts=None): + elastic = elastic_connect(hosts=hosts) with open(filepath) as fd: data = loads_ndjson(fd) @@ -42,10 +43,49 @@ def loads_ndjson(fd): yield json.loads(line) +def _document_wrapper(index, documents, doc_type, id_getter, operation): + for doc in documents: + + wrapper = {"action": {operation: { + "_index": index, + "_type": doc_type, + "_id": id_getter(doc) + }}} + + if operation != "delete": + wrapper["source"] = doc + + yield wrapper + + +def _sender(elastic, bulk, operation): + body = [s for e in bulk for s in [e["action"], e.get("source")] if s is not None] + res = elastic.bulk(body=body, refresh=True) + + failure = [i[operation] for i in res["items"] if "error" in i[operation]] + if failure: + raise Exception(str(failure)) + + +def _manager(elastic, documents, size, operation): + + while True: + bulk = list(islice(documents, size)) + if not bulk: + break + + _sender(elastic, bulk, operation) + + +def bulk(elastic, index, doc_type, documents, id_getter, bulk_size=100, operation="index"): + docs = _document_wrapper(index, documents, doc_type, id_getter, operation) + _manager(elastic, docs, bulk_size, operation) + + def insert(elastic, index, data): logging.info("Start insertion ...") id_getter = lambda d: d["id"] - upload.bulk(elastic, index, "document", data, id_getter) + bulk(elastic, index, "document", data, id_getter) logging.info("Done") @@ -62,17 +102,13 @@ def load_schema(filepath): return json.load(fd) -def elastic_connect(): +def elastic_connect(hosts=None): """ Create new elastic connection """ - es_hosts = os.environ["ES_HOST"].split(",") + es_hosts = hosts if hosts else ["http://localhost:9200"] kwargs = { "hosts": _normalize_hosts(es_hosts), "retry_on_timeout": True, - "timeout": 30, - "sniff_on_start": True, - "sniff_on_connection_fail": True, - "sniff_timeout": 10, - "sniffer_timeout": 60, + "timeout": 30 } return Elasticsearch(**kwargs) @@ -80,4 +116,7 @@ def elastic_connect(): if __name__ == "__main__": args = options() - create_index(args.filepath, args.schema_filepath, args.index_name, overwrite=args.overwrite) + create_index( + args.filepath, args.schema_filepath, args.index_name, + overwrite=args.overwrite, hosts=args.hosts + ) diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index dc6d33e..c9972a2 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -5,7 +5,7 @@ services: image: sel_test:${BUILD_TAG} command: bash -c "sleep infinity" environment: - - "ES_HOST=http://elasticsearch" + - "ES_HOSTS=http://elasticsearch" depends_on: elasticsearch: condition: service_healthy diff --git a/tests/test_offline_sel.py b/tests/test_offline_sel.py new file mode 100644 index 0000000..1f6ff58 --- /dev/null +++ b/tests/test_offline_sel.py @@ -0,0 +1,28 @@ +import json +import pytest +import logging + +from sel.sel import SEL + + +@pytest.fixture(scope="session") +def osel(): + return SEL(None, log_level=logging.DEBUG) + + +def load_schema(): + with open("scripts/schema.json", "r") as fd: + return json.load(fd)["mappings"] + + +class TestOfflineSEL(): + + def test_generator(self, osel): + query = {"query": "label = bag"} + res = osel.generate_query(query, schema=load_schema()) + assert res == { + 'warns': [], + 'elastic_query': {'query': {'bool': {'must': [{'nested': {'path': 'media.label', 'query': {'term': {'media.label.name': 'bag'}}}}], 'must_not': [{'term': {'deleted': True}}]}}, 'sort': [{'deleted': {'order': 'desc', 'nested_filter': {'bool': {'must_not': [{'term': {'deleted': True}}]}}}}, {'media.label.score': {'order': 'desc', 'nested_path': 'media.label', 'nested_filter': {'term': {'media.label.name': 'bag'}}}}]}, + 'internal_query': {'query': {'operator': 'and', 'items': [{'field': '.deleted', 'comparator': '!=', 'value': True}, {'field': 'label', 'comparator': '=', 'value': 'bag'}]}}, + 'query_data': {} + } diff --git a/tests/test_query_generator.py b/tests/test_query_generator.py index e5c8773..e9d6ba1 100644 --- a/tests/test_query_generator.py +++ b/tests/test_query_generator.py @@ -1,4 +1,5 @@ import pytest +import os from scripts import elastic @@ -11,13 +12,16 @@ TEST_INDEX_FILE = "/tests/data/sample_2017.json" TEST_SCHEMA_FILE = "/scripts/schema.json" TEST_INDEX = "test_index" +ES_HOSTS = os.environ["ES_HOSTS"].split(",") class TestQueryGenerator: @pytest.fixture(scope="session", autouse=True) def init(self): - elastic.create_index(TEST_INDEX_FILE, TEST_SCHEMA_FILE, TEST_INDEX, overwrite=True) + elastic.create_index( + TEST_INDEX_FILE, TEST_SCHEMA_FILE, TEST_INDEX, hosts=ES_HOSTS, overwrite=True + ) @pytest.mark.parametrize(["query", "expected_total"], [ diff --git a/tests/test_sel.py b/tests/test_sel.py index e64f837..8dab9ba 100644 --- a/tests/test_sel.py +++ b/tests/test_sel.py @@ -1,5 +1,6 @@ import pytest import json +import os from scripts import elastic @@ -10,13 +11,16 @@ TEST_INDEX_FILE = "/tests/data/sample_2017.json" TEST_SCHEMA_FILE = "/scripts/schema.json" TEST_INDEX = "test_index" +ES_HOSTS = os.environ["ES_HOSTS"].split(",") class TestSEL: @pytest.fixture(scope="function", autouse=True) def init(self): - elastic.create_index(TEST_INDEX_FILE, TEST_SCHEMA_FILE, TEST_INDEX, overwrite=True) + elastic.create_index( + TEST_INDEX_FILE, TEST_SCHEMA_FILE, TEST_INDEX, hosts=ES_HOSTS, overwrite=True + ) def __cleaner(self, obj):