Skip to content

Commit

Permalink
PDF Parse Workflow with Docling and Elastic Search (#1137)
Browse files Browse the repository at this point in the history
  • Loading branch information
stangirala authored Jan 2, 2025
1 parent 25303e2 commit 498dc94
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 17 deletions.
10 changes: 9 additions & 1 deletion examples/pdf_document_extraction/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# PDF Document Extraction and Indexing

The example builds a pipeline that extracts text, tables and figures from a PDF Document. It embeds the text, table and images from the document and writes them into ChromaDB.
This example also provides an alternate approach, that is OSS friendly, using Docling for document parsing and ElasticSearch as the vector store.

The pipeline is hosted on a server endpoint in one of the containers. The endpoint can be called from any Python application.

Expand All @@ -12,6 +13,8 @@ docker compose up

## Deploy the Graph
The [Graph](workflow.py) has all the code which performs PDF Parsing, embedding and writing the VectorDB. We will deploy the Graph on the server to create an Endpoint for our workflow.
Make sure to deploy the right graph before running the example.

```bash
pip install indexify
```
Expand All @@ -34,7 +37,7 @@ invocation_id = graph.run(block_until_done=True, url="")
```

## Outputs
You can read the output of every function of the Graph.
You can read the output of every function of the Graph. For example,

```python
chunks = graph.output(invocation_id, "chunk_text")
Expand All @@ -43,10 +46,15 @@ chunks = graph.output(invocation_id, "chunk_text")
The ChromaDB tables are populated automatically by the [ChromaDBWriter](https://github.com/tensorlakeai/indexify/blob/main/examples/pdf_document_extraction/chromadb_writer.py) class.
The name of the databases used in the example are `text_embeddings` and `image_embeddings`. The database running inside the container at port `8000` is forwarded to the host for convenience.

For ElasticSearch, the service in this example is set-up using `docker-compose.yaml`. `elastic_writer.py` relies on docker networking to connect to it
and index the generated vectors.

## Vector Search

Once the documents are processed, you can query ChromaDB for vector search. Here is some [same code for that](https://github.com/tensorlakeai/indexify/blob/main/examples/pdf_document_extraction/retreive.py)

For ElasticSearch `es_retrieve.py` has some sample python code to query the indexes.

## Customization

Copy the folder, modify the code as you like and simply upload the new Graph.
Expand Down
8 changes: 7 additions & 1 deletion examples/pdf_document_extraction/common_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,10 @@ class TextChunk(BaseModel):
class ImageWithEmbedding(BaseModel):
embedding: List[float]
image_bytes: bytes
page_number: int
page_number: int


# Docling Example Objects
class PDFParserDoclingOutput(BaseModel):
texts: List[str]
images: List[str]
36 changes: 36 additions & 0 deletions examples/pdf_document_extraction/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ services:
server:
volumes:
- data:/tmp/indexify-blob-storage
default-executor:
image: tensorlake/indexify-executor-default
command:
[
"indexify-cli",
"executor",
"--server-addr",
"indexify:8900"
]
networks:
server:
volumes:
- data:/tmp/indexify-blob-storage
pdf-parser-executor:
# Use this for GPU support
image: tensorlake/pdf-blueprint-pdf-parser-gpu:latest
Expand Down Expand Up @@ -92,6 +105,29 @@ services:
ports:
- 8000:8000

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms4g -Xmx4g"
networks:
server:
ports:
- "9200:9200"
volumes:
- esdata:/usr/share/elasticsearch/data
ulimits:
memlock:
soft: -1
hard: -1
deploy:
mode: replicated
replicas: 1
resources:
limits:
memory: 8G

volumes:
data:
esdata:
135 changes: 135 additions & 0 deletions examples/pdf_document_extraction/elastic_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from elastic_transport import ApiError
from elasticsearch import Elasticsearch
from typing import Union
import base64
import uuid

from common_objects import ImageWithEmbedding, TextChunk
from indexify.functions_sdk.indexify_functions import IndexifyFunction

from images import st_image


class ElasticSearchWriter(IndexifyFunction):
name = "elastic_search_writer"
image = st_image

def __init__(self):
super().__init__()
# Connect to Elasticsearch
self._client = Elasticsearch(
hosts=["http://elasticsearch:9200"], # <User Change>: default is service name in the docker compose file.
verify_certs=False,
ssl_show_warn=False,
#basic_auth=("elastic", "your_password"),
retry_on_timeout=True,
max_retries=3,
request_timeout=5,
)

# Create indices if they don't exist
self._create_indices_if_not_exists()

def _create_indices_if_not_exists(self):
# Text index mapping
text_mapping = {
"mappings": {
"properties": {
"embedding": {
"type": "dense_vector",
"dims": 768,
"index": True,
"similarity": "cosine",
"index_options": {
"type": "hnsw",
"m": 16,
"ef_construction": 100
}
},
"page_number": {"type": "integer"},
"chunk": {"type": "text"}
}
},
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
},
}

# Image index mapping
image_mapping = {
"mappings": {
"properties": {
"embedding": {
"type": "dense_vector",
"dims": 512,
"index": True,
"similarity": "cosine",
"index_options": {
"type": "hnsw",
"m": 16,
"ef_construction": 100
}
},
"page_number": {"type": "integer"},
"image_data": {"type": "binary"}
}
},
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
},
}

try:
self._client.indices.create(index="text_embeddings", body=text_mapping)
except ApiError as e:
if e.status_code == 400 and "resource_already_exists_exception" in str(e):
print("Text index already exists. Continuing.")
else:
raise e

try:
self._client.indices.create(index="image_embeddings", body=image_mapping)
except ApiError as e:
if e.status_code == 400 and "resource_already_exists_exception" in str(e):
print("Image index already exists. Continuing.")
else:
raise e

def run(self, input: Union[ImageWithEmbedding, TextChunk]) -> bool:
try:
if isinstance(input, ImageWithEmbedding):
# Convert image bytes to base64 for storage
image_base64 = base64.b64encode(input.image_bytes).decode('utf-8')

document = {
"embedding": input.embedding,
"page_number": input.page_number,
"image_data": image_base64
}

self._client.index(
index="image_embeddings",
id=str(uuid.uuid4()),
document=document
)

elif isinstance(input, TextChunk):
document = {
"embedding": input.embeddings,
"page_number": input.page_number,
"chunk": input.chunk
}

self._client.index(
index="text_embeddings",
id=str(uuid.uuid4()),
document=document
)

return True

except Exception as e:
print(f"Error indexing document: {str(e)}")
return False
50 changes: 49 additions & 1 deletion examples/pdf_document_extraction/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from indexify.functions_sdk.indexify_functions import IndexifyFunction, indexify_function
from sentence_transformers import SentenceTransformer
from common_objects import ImageWithEmbedding, TextChunk
from common_objects import ImageWithEmbedding, TextChunk, PDFParserDoclingOutput
from inkwell.api.document import Document
from inkwell.api.page import PageFragmentType
import base64
Expand Down Expand Up @@ -40,6 +40,24 @@ def chunk_text(document: Document) -> List[TextChunk]:
return chunks


@indexify_function(image=st_image)
def chunk_text_docling(document: PDFParserDoclingOutput) -> List[TextChunk]:
"""
Extract chunks from documents
"""
from langchain_text_splitters import RecursiveCharacterTextSplitter

chunks = []

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
for i, text in enumerate(document.texts):
splits = text_splitter.split_text(text)
for split in splits:
chunks.append(TextChunk(chunk=split, page_number=i+1))

return chunks


class TextEmbeddingExtractor(IndexifyFunction):
name = "text-embedding-extractor"
description = "Extractor class that captures an embedding model"
Expand Down Expand Up @@ -85,3 +103,33 @@ def run(self, document: Document) -> List[ImageWithEmbedding]:
)
)
return embedding


class ImageEmbeddingDoclingExtractor(IndexifyFunction):
name = "image-embedding-docling"
description = "Extractor class that captures an embedding model"
image=st_image

def __init__(self):
super().__init__()
self.model = SentenceTransformer("clip-ViT-B-32")

def run(self, document: PDFParserDoclingOutput) -> List[ImageWithEmbedding]:
import io
from PIL import Image as PILImage

embeddings = []
for i, image_str in enumerate(document.images):
img_bytes = io.BytesIO(base64.b64decode(image_str))
img_bytes.seek(0)
img_emb = self.model.encode(PILImage.open(img_bytes))
img_bytes.seek(0)
embeddings.append(
ImageWithEmbedding(
embedding=img_emb,
image_bytes=img_bytes.getvalue(),
page_number=i+1,
)
)

return embeddings
3 changes: 3 additions & 0 deletions examples/pdf_document_extraction/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
.run("pip install langchain")
.run("pip install pillow")
.run("pip install py-inkwell")
.run("pip install opentelemetry-api")
)

lance_image = (
Expand All @@ -42,4 +43,6 @@
.run("apt install -y tesseract-ocr")
.run("apt install -y libtesseract-dev")
.run('pip install "py-inkwell[inference]"')
.run('pip install docling')
.run("pip install elastic-transport")
)
52 changes: 52 additions & 0 deletions examples/pdf_document_extraction/pdf_parser_docling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from common_objects import PDFParserDoclingOutput
from indexify.functions_sdk.data_objects import File
from indexify.functions_sdk.indexify_functions import IndexifyFunction

from images import inkwell_image_gpu


class PDFParserDocling(IndexifyFunction):
name = "pdf-parse-docling"
description = "Parser class that captures a pdf file"
# Change to gpu_image to use GPU
image = inkwell_image_gpu

def __init__(self):
super().__init__()

def run(self, file: File) -> PDFParserDoclingOutput:
from docling.datamodel.pipeline_options import PdfPipelineOptions
IMAGE_RESOLUTION_SCALE = 2.0
pipeline_options = PdfPipelineOptions()
pipeline_options.images_scale = IMAGE_RESOLUTION_SCALE
pipeline_options.generate_page_images = True

from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.base_models import InputFormat

import tempfile
with tempfile.NamedTemporaryFile(mode="wb", suffix=".pdf") as f:
f.write(file.data)
converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
}
)
result = converter.convert(f.name)

texts = []
for i in range(len(result.pages)):
page_result = result.document.export_to_markdown(page_no=i+1)
texts.append(page_result)

images = []
for element, _level in result.document.iterate_items():
from docling_core.types.doc import ImageRefMode, PictureItem, TableItem
if isinstance(element, PictureItem):
pil_image = element.get_image(result.document)

# Using docling APIs to avoid confusion.
b64 = element._image_to_base64(pil_image)
images.append(b64)

return PDFParserDoclingOutput(texts=texts, images=images)
12 changes: 9 additions & 3 deletions examples/pdf_document_extraction/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
httpx
indexify
pydantic
py-inkwell
chromadb
docling==2.14.0
docling-core
sentence-transformers
chromadb
elasticsearch
py-inkwell[api]
langchain-text-splitters
elastic-transport
Loading

0 comments on commit 498dc94

Please sign in to comment.