Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
Add test_offline_sel + Improve README and scripts/elastic.py
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnaudParant committed Feb 20, 2024
1 parent d70c69e commit 7aa0377
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 21 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Two first digits of SEL version match Elasticsearch version and then it's the in


## Compagny
SEL was initially developed for Heuritech in 2016 and used by everybody inside the compagny tech and no-tech people since that time to explore internal data, generate reports and analysis.
SEL was initially developed for Heuritech in 2015 and used by everybody inside the compagny tech and no-tech people since that time to explore internal data, generate reports and analysis.


## Quickstart
Expand All @@ -35,7 +35,7 @@ from sel.sel import SEL
es = Elasticsearch(hosts="http://elasticsearch")
sel = SEL(es)
sel.search("my_index", "label = bag")
sel.search("my_index", {"query": "label = bag"})
```

### SEL as ES query generator
Expand All @@ -45,15 +45,15 @@ from sel.sel import SEL
es = Elasticsearch(hosts="http://elasticsearch")
sel = SEL(es)
sel.generate_query("label = bag", index="my_index")["elastic_query"]
sel.generate_query({"query": "label = bag"}, index="my_index")["elastic_query"]
```

### SEL as offline ES query generator
```
from sel.sel import SEL
sel = SEL(None)
sel.generate_query("label = bag", schema=my_index_schema)["elastic_query"]
sel.generate_query({"query": "label = bag"}, schema=my_index_schema)["elastic_query"]
```

### SEL as API (SEL Server)
Expand Down
67 changes: 53 additions & 14 deletions scripts/elastic.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import os
#!/usr/bin/python3

import json
import argparse
import logging
import time
from itertools import islice
from elasticsearch import Elasticsearch
from elasticsearch.client import _normalize_hosts

from sel import upload


def options():
parser = argparse.ArgumentParser()
parser.add_argument("filepath")
parser.add_argument("schema_filepath")
parser.add_argument("index_name")
parser.add_argument("--overwrite", action="store_true")
parser.add_argument("--hosts", nargs='+')
return parser.parse_args()


def create_index(filepath, schema_filepath, index, overwrite=False):
elastic = elastic_connect()
def create_index(filepath, schema_filepath, index, overwrite=False, hosts=None):
elastic = elastic_connect(hosts=hosts)

with open(filepath) as fd:
data = loads_ndjson(fd)
Expand All @@ -42,10 +43,49 @@ def loads_ndjson(fd):
yield json.loads(line)


def _document_wrapper(index, documents, doc_type, id_getter, operation):
for doc in documents:

wrapper = {"action": {operation: {
"_index": index,
"_type": doc_type,
"_id": id_getter(doc)
}}}

if operation != "delete":
wrapper["source"] = doc

yield wrapper


def _sender(elastic, bulk, operation):
body = [s for e in bulk for s in [e["action"], e.get("source")] if s is not None]
res = elastic.bulk(body=body, refresh=True)

failure = [i[operation] for i in res["items"] if "error" in i[operation]]
if failure:
raise Exception(str(failure))


def _manager(elastic, documents, size, operation):

while True:
bulk = list(islice(documents, size))
if not bulk:
break

_sender(elastic, bulk, operation)


def bulk(elastic, index, doc_type, documents, id_getter, bulk_size=100, operation="index"):
docs = _document_wrapper(index, documents, doc_type, id_getter, operation)
_manager(elastic, docs, bulk_size, operation)


def insert(elastic, index, data):
logging.info("Start insertion ...")
id_getter = lambda d: d["id"]
upload.bulk(elastic, index, "document", data, id_getter)
bulk(elastic, index, "document", data, id_getter)
logging.info("Done")


Expand All @@ -62,22 +102,21 @@ def load_schema(filepath):
return json.load(fd)


def elastic_connect():
def elastic_connect(hosts=None):
""" Create new elastic connection """
es_hosts = os.environ["ES_HOST"].split(",")
es_hosts = hosts if hosts else ["http://localhost:9200"]
kwargs = {
"hosts": _normalize_hosts(es_hosts),
"retry_on_timeout": True,
"timeout": 30,
"sniff_on_start": True,
"sniff_on_connection_fail": True,
"sniff_timeout": 10,
"sniffer_timeout": 60,
"timeout": 30
}

return Elasticsearch(**kwargs)


if __name__ == "__main__":
args = options()
create_index(args.filepath, args.schema_filepath, args.index_name, overwrite=args.overwrite)
create_index(
args.filepath, args.schema_filepath, args.index_name,
overwrite=args.overwrite, hosts=args.hosts
)
2 changes: 1 addition & 1 deletion tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
image: sel_test:${BUILD_TAG}
command: bash -c "sleep infinity"
environment:
- "ES_HOST=http://elasticsearch"
- "ES_HOSTS=http://elasticsearch"
depends_on:
elasticsearch:
condition: service_healthy
Expand Down
28 changes: 28 additions & 0 deletions tests/test_offline_sel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import json
import pytest
import logging

from sel.sel import SEL


@pytest.fixture(scope="session")
def osel():
return SEL(None, log_level=logging.DEBUG)


def load_schema():
with open("scripts/schema.json", "r") as fd:
return json.load(fd)["mappings"]


class TestOfflineSEL():

def test_generator(self, osel):
query = {"query": "label = bag"}
res = osel.generate_query(query, schema=load_schema())
assert res == {
'warns': [],
'elastic_query': {'query': {'bool': {'must': [{'nested': {'path': 'media.label', 'query': {'term': {'media.label.name': 'bag'}}}}], 'must_not': [{'term': {'deleted': True}}]}}, 'sort': [{'deleted': {'order': 'desc', 'nested_filter': {'bool': {'must_not': [{'term': {'deleted': True}}]}}}}, {'media.label.score': {'order': 'desc', 'nested_path': 'media.label', 'nested_filter': {'term': {'media.label.name': 'bag'}}}}]},
'internal_query': {'query': {'operator': 'and', 'items': [{'field': '.deleted', 'comparator': '!=', 'value': True}, {'field': 'label', 'comparator': '=', 'value': 'bag'}]}},
'query_data': {}
}
6 changes: 5 additions & 1 deletion tests/test_query_generator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import os

from scripts import elastic

Expand All @@ -11,13 +12,16 @@
TEST_INDEX_FILE = "/tests/data/sample_2017.json"
TEST_SCHEMA_FILE = "/scripts/schema.json"
TEST_INDEX = "test_index"
ES_HOSTS = os.environ["ES_HOSTS"].split(",")


class TestQueryGenerator:

@pytest.fixture(scope="session", autouse=True)
def init(self):
elastic.create_index(TEST_INDEX_FILE, TEST_SCHEMA_FILE, TEST_INDEX, overwrite=True)
elastic.create_index(
TEST_INDEX_FILE, TEST_SCHEMA_FILE, TEST_INDEX, hosts=ES_HOSTS, overwrite=True
)


@pytest.mark.parametrize(["query", "expected_total"], [
Expand Down
6 changes: 5 additions & 1 deletion tests/test_sel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
import json
import os

from scripts import elastic

Expand All @@ -10,13 +11,16 @@
TEST_INDEX_FILE = "/tests/data/sample_2017.json"
TEST_SCHEMA_FILE = "/scripts/schema.json"
TEST_INDEX = "test_index"
ES_HOSTS = os.environ["ES_HOSTS"].split(",")


class TestSEL:

@pytest.fixture(scope="function", autouse=True)
def init(self):
elastic.create_index(TEST_INDEX_FILE, TEST_SCHEMA_FILE, TEST_INDEX, overwrite=True)
elastic.create_index(
TEST_INDEX_FILE, TEST_SCHEMA_FILE, TEST_INDEX, hosts=ES_HOSTS, overwrite=True
)


def __cleaner(self, obj):
Expand Down

0 comments on commit 7aa0377

Please sign in to comment.