diff --git a/applications/rag/example_notebooks/ray-hf-cloudsql-latest.ipynb b/applications/rag/example_notebooks/ray-hf-cloudsql-latest.ipynb deleted file mode 100644 index d207f2d11..000000000 --- a/applications/rag/example_notebooks/ray-hf-cloudsql-latest.ipynb +++ /dev/null @@ -1,298 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "f80fac00-27c3-4f4e-8a5e-c9277bd559bc", - "metadata": {}, - "outputs": [], - "source": [ - "# install dependencies\n", - "!pip install ray[default]==2.9.3" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a8f5810e-6e64-48c4-b525-824ddbff4373", - "metadata": {}, - "outputs": [], - "source": [ - "! mkdir -p test" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "584589d6-b197-46ab-b863-e4de1f365bf2", - "metadata": {}, - "outputs": [], - "source": [ - "%%writefile test/test.py\n", - "# Comment out the above line if you want to see notebook print out, but the line is required for the actual ray job\n", - "\n", - "import os\n", - "import uuid\n", - "import ray\n", - "from langchain.document_loaders import ArxivLoader\n", - "from langchain.text_splitter import RecursiveCharacterTextSplitter\n", - "from sentence_transformers import SentenceTransformer\n", - "from typing import List\n", - "import torch\n", - "from datasets import load_dataset_builder, load_dataset, Dataset\n", - "from huggingface_hub import snapshot_download\n", - "from google.cloud.sql.connector import Connector, IPTypes\n", - "import sqlalchemy\n", - "\n", - "# initialize parameters\n", - "INSTANCE_CONNECTION_NAME = \"::pgvector-instance\" # Modify the project and region based on your setting\n", - "print(f\"Your instance connection name is: {INSTANCE_CONNECTION_NAME}\")\n", - "DB_USER = \"rag-user-notebook\" # Modify this based on your setting\n", - "DB_PASS = \"\" # Modify this based on your setting\n", - "DB_NAME = \"pgvector-database\"\n", - "\n", - "# initialize Connector object\n", - "connector = Connector()\n", - "\n", - "# function to return the database connection object\n", - "def getconn():\n", - " conn = connector.connect(\n", - " INSTANCE_CONNECTION_NAME,\n", - " \"pg8000\",\n", - " user=DB_USER,\n", - " password=DB_PASS,\n", - " db=DB_NAME,\n", - " ip_type=IPTypes.PRIVATE\n", - " )\n", - " return conn\n", - "\n", - "# create connection pool with 'creator' argument to our connection object function\n", - "pool = sqlalchemy.create_engine(\n", - " \"postgresql+pg8000://\",\n", - " creator=getconn,\n", - ")\n", - "\n", - "# Get data from dataset\n", - "DATASET = 'wiki_dpr' # Huggingface Dataset to use\n", - "DATASET_SUBSET = 'psgs_w100.multiset.compressed.no_embeddings'\n", - "SHARED_DATA_BASEPATH='/data/rag/st'\n", - "SENTENCE_TRANSFORMER_MODEL = 'intfloat/multilingual-e5-small' # Transformer to use for converting text chunks to vector embeddings\n", - "SENTENCE_TRANSFORMER_MODEL_PATH_NAME='models--intfloat--multilingual-e5-small' # the downloaded model path takes this form for a given model name\n", - "SENTENCE_TRANSFORMER_MODEL_SNAPSHOT=\"ffdcc22a9a5c973ef0470385cef91e1ecb461d9f\" # specific snapshot of the model to use\n", - "SENTENCE_TRANSFORMER_MODEL_PATH = SHARED_DATA_BASEPATH + '/' + SENTENCE_TRANSFORMER_MODEL_PATH_NAME + '/snapshots/' + SENTENCE_TRANSFORMER_MODEL_SNAPSHOT # the path where the model is downloaded one time\n", - "\n", - "BATCH_SIZE = 100\n", - "CHUNK_SIZE = 1000 # text chunk sizes which will be converted to vector embeddings\n", - "CHUNK_OVERLAP = 10\n", - "TABLE_NAME = 'huggingface_db' # CloudSQL table name\n", - "DIMENSION = 384 # Embeddings size\n", - "WORKING_DATASET_SIZE = 100 # number of dataset rows used for the example\n", - "ACTOR_POOL_SIZE = 1 # number of actors for the distributed map_batches function\n", - "\n", - "class Embed:\n", - " def __init__(self):\n", - " print(\"torch cuda version\", torch.version.cuda)\n", - " device=\"cpu\"\n", - " if torch.cuda.is_available():\n", - " print(\"device cuda found\")\n", - " device=\"cuda\"\n", - "\n", - " print (\"reading sentence transformer model from cache path:\", SENTENCE_TRANSFORMER_MODEL_PATH)\n", - " self.transformer = SentenceTransformer(SENTENCE_TRANSFORMER_MODEL_PATH, device=device)\n", - " self.splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, length_function=len)\n", - "\n", - " def __call__(self, text_batch: List[str]):\n", - " text = text_batch[\"item\"]\n", - " # print(\"type(text)=\", type(text), \"type(text_batch)=\", type(text_batch))\n", - " chunks = []\n", - " for data in text:\n", - " splits = self.splitter.split_text(data)\n", - " # print(\"len(data)\", len(data), \"len(splits)=\", len(splits))\n", - " chunks.extend(splits)\n", - "\n", - " embeddings = self.transformer.encode(\n", - " chunks,\n", - " batch_size=BATCH_SIZE\n", - " ).tolist()\n", - " print(\"len(chunks)=\", len(chunks), \", len(emb)=\", len(embeddings))\n", - " return {'results':list(zip(chunks, embeddings))}\n", - "\n", - "\n", - "# prepare the persistent shared directory to store artifacts needed for the ray workers\n", - "os.makedirs(SHARED_DATA_BASEPATH, exist_ok=True)\n", - "\n", - "# One time download of the sentence transformer model to a shared persistent storage available to the ray workers\n", - "snapshot_download(repo_id=SENTENCE_TRANSFORMER_MODEL, revision=SENTENCE_TRANSFORMER_MODEL_SNAPSHOT, cache_dir=SHARED_DATA_BASEPATH)\n", - "\n", - "print(\"WORKING_DATASET_SIZE=\", WORKING_DATASET_SIZE)\n", - "\n", - "# Process the dataset first\n", - "# Load the HF dataset in streaming mode\n", - "dataset = load_dataset(DATASET, DATASET_SUBSET, split='train', streaming=True)\n", - "sub_data_dataset = dataset.take(WORKING_DATASET_SIZE)\n", - "test_dataset = Dataset.from_list(list(sub_data_dataset))\n", - "\n", - "# Wrap the HF data with a ray data\n", - "ray_ds = ray.data.from_items(test_dataset)\n", - "print(ray_ds.schema)\n", - "\n", - "# Distributed flat map to extract the raw text fields.\n", - "ds_batch = ray_ds.flat_map(lambda row: [{'item': row[\"text\"].replace(\"\\n\", \" \")}])\n", - "print(ds_batch.schema)\n", - "\n", - "# Distributed map batches to create chunks out of each row, and fetch the vector embeddings by running inference on the sentence transformer\n", - "ds_embed = ds_batch.map_batches(\n", - " Embed,\n", - " compute=ray.data.ActorPoolStrategy(size=ACTOR_POOL_SIZE),\n", - " batch_size=BATCH_SIZE, # Large batch size to maximize GPU utilization.\n", - " num_gpus=1, # 1 GPU for each actor.\n", - " # num_cpus=1,\n", - ")\n", - "\n", - "print(\"Embeddings ray dataset\", ds_embed.schema)\n", - "\n", - "data_text = \"\"\n", - "data_emb = \"\"\n", - "\n", - "# connect to connection pool\n", - "with pool.connect() as db_conn:\n", - " db_conn.execute(\n", - " sqlalchemy.text(\n", - " \"CREATE EXTENSION IF NOT EXISTS vector;\"\n", - " )\n", - " )\n", - " db_conn.commit()\n", - "\n", - " # create huggingface_db table\n", - " create_table_query = \"CREATE TABLE IF NOT EXISTS \" + TABLE_NAME + \" ( id VARCHAR(255) NOT NULL, text TEXT NOT NULL, text_embedding vector(384) NOT NULL, PRIMARY KEY (id));\"\n", - " db_conn.execute(\n", - " sqlalchemy.text(create_table_query)\n", - " )\n", - " # commit transaction (SQLAlchemy v2.X.X is commit as you go)\n", - " db_conn.commit()\n", - " print(\"Created table=\", TABLE_NAME)\n", - "\n", - " # TODO: Fix workaround access grant for the frontend to access the table.\n", - " grant_access_stmt = \"GRANT SELECT on \" + TABLE_NAME + \" to \\\"rag-user\\\";\"\n", - " db_conn.execute(\n", - " sqlalchemy.text(grant_access_stmt)\n", - " )\n", - "\n", - " insert_stmt = sqlalchemy.text(\n", - " \"INSERT INTO huggingface_db (id, text, text_embedding) VALUES (:id, :text, :text_embedding)\",\n", - " )\n", - " for output in ds_embed.iter_rows():\n", - " # print (\"type of embeddings\", type(output[\"results\"][1]), \"len embeddings\", len(output[\"results\"][1]))\n", - " # restrict the text string to be less than 65535\n", - " data_text = output[\"results\"][0][:65535]\n", - " # vector data pass in needs to be a string \n", - " data_emb = \",\".join(map(str, output[\"results\"][1]))\n", - " data_emb = \"[\" + data_emb + \"]\"\n", - " # print(\"text_embedding is \", data_emb)\n", - " id = uuid.uuid4()\n", - "\n", - " # insert entries into table\n", - " # print(\"insert into huggingface_db\")\n", - " db_conn.execute(insert_stmt, parameters={\"id\": id, \"text\": data_text, \"text_embedding\": data_emb})\n", - " \n", - "\n", - " # batch commit transactions\n", - " db_conn.commit()\n", - "\n", - " # query and fetch table\n", - " results = db_conn.execute(sqlalchemy.text(\"SELECT * FROM huggingface_db\")).fetchall()\n", - " # uncomment to show results to verify or debug\n", - " # for row in results:\n", - " # print(row)\n", - "\n", - " # verify results\n", - " transformer = SentenceTransformer(SENTENCE_TRANSFORMER_MODEL)\n", - " query_text = \"Each family would then have two or three small pieces of land scattered about the village, which they used to grow crops. For many years people lived in the village and then in 1845 Famine struck in Achill as it did in the rest of Ireland\" \n", - " query_emb = transformer.encode(query_text).tolist()\n", - " query_request = \"SELECT id, text, text_embedding, 1 - ('[\" + \",\".join(map(str, query_emb)) + \"]' <=> text_embedding) AS cosine_similarity FROM \" + TABLE_NAME + \" ORDER BY cosine_similarity DESC LIMIT 5;\" \n", - " query_results = db_conn.execute(sqlalchemy.text(query_request)).fetchall()\n", - " db_conn.commit()\n", - " print(\"print query_results, the 1st one is the hit\") \n", - " for row in query_results:\n", - " print(row)\n", - "\n", - "# cleanup connector object\n", - "connector.close()\n", - "print (\"end job\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "376bb34d-5031-42f8-bf09-e80b5cba65a8", - "metadata": {}, - "outputs": [], - "source": [ - "import ray\n", - "from ray.job_submission import JobSubmissionClient\n", - "client = JobSubmissionClient(\"ray://ray-cluster-kuberay-head-svc:10001\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f4324c4d-1f56-4b06-9004-96ede8683dee", - "metadata": {}, - "outputs": [], - "source": [ - "job_id = client.submit_job(\n", - " entrypoint=\"python test.py\",\n", - " # Path to the local directory that contains the entrypoint file.\n", - " runtime_env={\n", - " \"working_dir\": \"/home/jovyan/test\", # upload the local working directory to ray workers\n", - " \"pip\": [\n", - " \"langchain\",\n", - " \"transformers\",\n", - " \"sentence-transformers\",\n", - " \"pyarrow\",\n", - " \"datasets\",\n", - " \"torch==2.0.1\",\n", - " \"cloud-sql-python-connector[pg8000]\",\n", - " \"SQLAlchemy\",\n", - " \"huggingface_hub\",\n", - " ]\n", - " }\n", - ")\n", - "print(\"jobid:\", job_id)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "5f93500d-99fd-4fe2-afed-506de0446b14", - "metadata": {}, - "outputs": [], - "source": [ - "# Need to run kubectl port-forward -n service/ray-cluster-kuberay-head-svc 8265:8265 to see the UI\n", - "# Fetch job status\n", - "!ray job status {job_id} --address \"ray://ray-cluster-kuberay-head-svc:10001\" " - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.11" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -}