Skip to content

Commit

Permalink
Refactored training server&port
Browse files Browse the repository at this point in the history
  • Loading branch information
jomariya23156 committed Mar 31, 2024
1 parent 5a9b2b1 commit 2ef2c24
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 90 deletions.
29 changes: 10 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,13 @@ From doc: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-com
## How to run Kubernetes/Helm
1. `cd sfmlops-helm` and `helm dependency build` to fetch all dependencies
2. Both install and upgrade the main chart: `helm upgrade --install --create-namespace -n mlops sfmlops-helm ./ -f values.yaml -f values-ray.yaml`
3. Deploy Kafka: `helm -n kafka upgrade --install kafka-release oci://registry-1.docker.io/bitnamicharts/kafka --create-namespace --version 23.0.7 -f values-kafka.yaml`
3. Deploy Kafka:
1. `helm repo add bitnami https://charts.bitnami.com/bitnami`
2. `helm -n kafka upgrade --install kafka-release oci://registry-1.docker.io/bitnamicharts/kafka --create-namespace --version 23.0.7 -f values-kafka.yaml`
4. Deploy Airflow:
1. `helm repo add apache-airflow https://airflow.apache.org`
2. `helm -n airflow upgrade --install airflow apache-airflow/airflow --create-namespace --version 1.13.1 -f values-airflow.yaml`
5. Forward Airflow UI port, so we can access: `kubectl port-forward svc/airflow-webserver 8080:8080 --namespace airflow`
6. Install Ray (KubeRay):
1. Deploy KubeRay operator:
1. `helm repo add kuberay https://ray-project.github.io/kuberay-helm/ && helm repo update`
2. `helm upgrade --install -n mlops kuberay-operator kuberay/kuberay-operator --version 1.1.0-rc.0`
2. Deploy RayCluster Custom Resource (CR)
1. `helm upgrade --install -n mlops raycluster kuberay/ray-cluster --version 1.1.0-rc.0 --set 'image.tag=2.9.3-py39-cpu-aarch64'`
2. Verify by `kubectl get rayclusters`
7. Forward Ray Dashboard: `kubectl port-forward --address 0.0.0.0 service/ray-head-service 8265:8265`

**Note:** If you want to change namespace `kafka` and/or release name `kafka-release` of Kafka, please also change them in `values.yaml` and `KAFKA_BOOTSTRAP_SERVER` env var in `values-airflow.yaml`. They are also used in templating.

Expand All @@ -52,18 +46,13 @@ Ray Dashboard: 8265
Nginx: 80
Forecast service: 4242 (proxied by nginx)
Training service: 4243 (proxied by nginx)

### bitnami/kafka helm install
1. `helm repo add bitnami https://charts.bitnami.com/bitnami`
2. `cd sfmlops-helm` and `helm dependency build` to fetch all dependencies
3. Run using `helm install sfmlops-helm sfmlops-helm/ -f values.yaml -f values-kafka.yaml`

### temp
explicitly install kafka after main chart
`helm install kafka-release oci://registry-1.docker.io/bitnamicharts/kafka --version 28.0.0 -f values-kafka.yaml`
Postgres: 5432
PGadmin: 16543
Kafka: 9092
Kafka UI: 8800

### Note on Kafka Docker Compose and Helm
Kafka services on Docker Compose and Halm are different in settings, mainly in Docker Compose, we use KRaft for config management (which is newer), but in Helm
Kafka services on Docker Compose and Halm are different in settings, mainly in Docker Compose, we use KRaft for config management (which is newer), but in Helm, we use ZooKeeper because, honestly, I'm not managed to pull it off with KRaft, sorry :'( (It's quite complex).

### Note on Stream processing options
There are a few options we can do to consume the stream data from Kafka producer and save to Postgres
Expand Down Expand Up @@ -114,4 +103,6 @@ If we restart the Ray container, all previous job history will be gone because R

## References
- Airflow Helm: https://airflow.apache.org/docs/helm-chart/stable/index.html
- Airflow Helm default values.yaml: https://github.com/apache/airflow/blob/main/chart/values.yaml
- Ray sample config: https://github.com/ray-project/kuberay/tree/master/ray-operator/config/samples
- Bitnami Kafka Helm: https://github.com/bitnami/charts/tree/main/bitnami/kafka
38 changes: 14 additions & 24 deletions dev_files/post_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,7 @@ def wait_until_status(

body = [
{
"store_id": "1",
"product_name": "product_A",
"begin_date": "2023-03-01T00:00:00Z",
"end_date": "2023-03-07T00:00:00Z",
},
{
"store_id": "2",
"product_name": "product_A",
"begin_date": "2023-03-01T00:00:00Z",
"end_date": "2023-03-07T00:00:00Z",
},
{
"store_id": "3",
"store_id": "1000",
"product_name": "product_A",
"begin_date": "2023-03-01T00:00:00Z",
"end_date": "2023-03-07T00:00:00Z",
Expand All @@ -42,18 +30,20 @@ def wait_until_status(
# resp = requests.post("http://localhost:4243/train", json=body)

# reverse proxy test (nginx)
# resp = requests.post("http://localhost/api/trainers/train", json=body)
# resp = requests.post("http://localhost/api/trainers/1000/product_A/train", json=body)
resp = requests.post("http://localhost/api/forecasters/forecast", json=body)
# resp = requests.post("http://localhost/api/trainers/train")
resp = requests.post("http://localhost/api/trainers/1000/product_A/train")
# resp = requests.post("http://localhost/api/forecasters/forecast", json=body)
# resp = requests.get("http://localhost/")
print("status_code:", resp.status_code)
print(resp.raw)
resp_json = resp.json()
pprint(resp_json, indent=4)

print("Watching training task status...")
status_to_wait_for = {"SUCCEEDED", "STOPPED", "FAILED"}
wait_until_status(
endpoint=f"http://localhost/api/trainers/training_job_status/{resp_json['train_job_id']}",
status_to_wait_for=status_to_wait_for,
poll_interval=5,
timeout_seconds=60 * 30, # 30 mins
)
# print("Watching training task status...")
# status_to_wait_for = {"SUCCEEDED", "STOPPED", "FAILED"}
# wait_until_status(
# endpoint=f"http://localhost/api/trainers/training_job_status/{resp_json['train_job_id']}",
# status_to_wait_for=status_to_wait_for,
# poll_interval=5,
# timeout_seconds=60 * 30, # 30 mins
# )
9 changes: 8 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ services:
- .env
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- DB_CONNECTION_URL=postgresql://spark_user:SuperSecurePwdHere@postgres:{POSTGRES_PORT}/spark_pg_db
networks:
- forecast-network
volumes:
Expand Down Expand Up @@ -126,7 +127,8 @@ services:
env_file:
- .env
environment:
- NGINX_HOST_NAME=nginx
- TRAINING_SERVICE_SERVER=nginx
- TRAINING_SERVICE_URL_PREFIX=api/trainers/ # trailing / is important
- FORECAST_ENDPOINT_URL=http://nginx/api/forecasters/forecast
- DB_CONNECTION_URL=postgresql://spark_user:SuperSecurePwdHere@postgres:${POSTGRES_PORT}/spark_pg_db
networks:
Expand All @@ -146,6 +148,9 @@ services:
TRAINING_SERVICE_PORT: ${TRAINING_SERVICE_PORT}
env_file:
- .env
environment:
- RAY_DASHBOARD_HOST=ray
- RAY_DASHBOARD_PORT=8265
networks:
- forecast-network
depends_on:
Expand Down Expand Up @@ -175,6 +180,8 @@ services:
- RAY_PROMETHEUS_HOST=http://prometheus:9090
- RAY_PROMETHEUS_NAME=Prometheus
- RAY_GRAFANA_IFRAME_HOST=http://localhost:${GRAFANA_PORT}
- MLFLOW_TRACKING_URI=http://mlflow:5050
- DB_CONNECTION_URL=postgresql://spark_user:SuperSecurePwdHere@postgres:${POSTGRES_PORT}/spark_pg_db
networks:
- forecast-network
- backend-network
Expand Down
12 changes: 7 additions & 5 deletions services/airflow/dags/task_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
prepare_db,
)

NGINX_HOST_NAME = os.getenv("NGINX_HOST_NAME", "nginx")
TRAINING_SERVICE_SERVER = os.getenv("TRAINING_SERVICE_SERVER", "nginx")
TRAINING_SERVICE_URL_PREFIX = os.getenv("TRAINING_SERVICE_URL_PREFIX", "api/trainers/")
FORECAST_ENDPOINT_URL = os.getenv(
"FORECAST_ENDPOINT_URL", f"http://nginx/api/forecasters/forecast"
)
SALES_TABLE_NAME = os.getenv("SALES_TABLE_NAME", "rossman_sales")
FORECAST_TABLE_NAME = os.getenv("FORECAST_TABLE_NAME", "forecast_results")
POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5432")
Expand Down Expand Up @@ -54,7 +58,7 @@ def check_status(**kwargs): # New function for task
print("Watching training task status...")
status_to_wait_for = {"SUCCEEDED", "STOPPED", "FAILED"}
wait_until_status(
endpoint=f"http://{NGINX_HOST_NAME}/api/trainers/training_job_status/{resp_json['train_job_id']}",
endpoint=f"http://{TRAINING_SERVICE_SERVER}/{TRAINING_SERVICE_URL_PREFIX}training_job_status/{resp_json['train_job_id']}",
status_to_wait_for=status_to_wait_for,
poll_interval=5,
timeout_seconds=60 * 30,
Expand Down Expand Up @@ -96,9 +100,7 @@ def post_forecast(**kwargs): # New function for task
ti = kwargs["ti"]
requst_body = ti.xcom_pull(task_ids="build_request_body") # Pull XCom
print(f"type: {type(requst_body)}")
resp = requests.post(
f"http://{NGINX_HOST_NAME}/api/forecasters/forecast", json=requst_body
)
resp = requests.post(FORECAST_ENDPOINT_URL, json=requst_body)
print(f"Status: {resp.status_code}")
return resp.json()

Expand Down
7 changes: 4 additions & 3 deletions services/airflow/dags/train_predict_to_db_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
save_forecasts_to_db,
)

NGINX_HOST_NAME = os.getenv("NGINX_HOST_NAME", "nginx")
TRAINING_SERVICE_SERVER = os.getenv("TRAINING_SERVICE_SERVER", "nginx")
TRAINING_SERVICE_URL_PREFIX = os.getenv("TRAINING_SERVICE_URL_PREFIX", "api/trainers/")

with DAG(
dag_id="train_predict_to_db_dag",
Expand All @@ -35,8 +36,8 @@
# POST to the training service
trigger_train_task = BashOperator(
task_id="call_train_service",
bash_command=f"curl -X POST http://{NGINX_HOST_NAME}/api/trainers/train",
# bash_command=f"curl -X POST http://{NGINX_HOST_NAME}/api/trainers/100/product_A/train",
bash_command=f"curl -X POST http://{TRAINING_SERVICE_SERVER}/{TRAINING_SERVICE_URL_PREFIX}train",
# bash_command=f"curl -X POST http://{TRAINING_SERVICE_SERVER}/{TRAINING_SERVICE_URL_PREFIX}100/product_A/train",
)

# Poll the status of the training job
Expand Down
7 changes: 4 additions & 3 deletions services/web-ui/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
df_from_forecast_response,
)

NGINX_HOST_NAME = os.getenv("NGINX_HOST_NAME", "nginx")
TRAINING_SERVICE_SERVER = os.getenv("TRAINING_SERVICE_SERVER", "nginx")
TRAINING_SERVICE_URL_PREFIX = os.getenv("TRAINING_SERVICE_URL_PREFIX", "api/trainers/")
FORECAST_ENDPOINT_URL = os.getenv(
"FORECAST_ENDPOINT_URL", f"http://nginx/api/forecasters/forecast"
)
Expand Down Expand Up @@ -101,14 +102,14 @@
f"Store ID: {input_store_id} | Product name: {input_product_name}"
)
resp = requests.post(
f"http://{NGINX_HOST_NAME}/api/trainers/{input_store_id}/{input_product_name}/train"
f"http://{TRAINING_SERVICE_SERVER}/{TRAINING_SERVICE_URL_PREFIX}{input_store_id}/{input_product_name}/train"
)
resp_json = resp.json()
st.json(resp_json)
st.write("Watching training task status...")
status_to_wait_for = {"SUCCEEDED", "STOPPED", "FAILED"}
exit_status = wait_until_status(
endpoint=f"http://{NGINX_HOST_NAME}/api/trainers/training_job_status/{resp_json['train_job_id']}",
endpoint=f"http://{TRAINING_SERVICE_SERVER}/{TRAINING_SERVICE_URL_PREFIX}training_job_status/{resp_json['train_job_id']}",
status_to_wait_for=status_to_wait_for,
poll_interval=1,
timeout_seconds=30, # 30 seconds
Expand Down
3 changes: 0 additions & 3 deletions sfmlops-helm/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,3 @@ dependencies:
- name: ray-cluster
version: 1.1.0-rc.0
repository: https://ray-project.github.io/kuberay-helm/
# - name: kafka
# version: 23.0.7
# repository: oci://registry-1.docker.io/bitnamicharts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@ spec:
labels:
component: {{ .Values.forecastServiceLabel }}
spec:
volumes:
- name: mlflow-data
persistentVolumeClaim:
claimName: {{ .Values.mlflowPvcName }}
containers:
- name: forecast-service
image: ariya23156/sfmlops-forecast-service
ports:
- containerPort: {{ .Values.forecastServicePort }}
volumeMounts:
- name: mlflow-data
mountPath: {{ .Values.mlflowArtifactRoot }}
envFrom:
- configMapRef:
name: global-configmap
Expand Down
18 changes: 16 additions & 2 deletions sfmlops-helm/templates/ray-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ spec:
containers:
- name: ray-head
image: ariya23156/sfmlops-ray
imagePullPolicy: Always
ports:
- containerPort: 6379
name: gcs
Expand All @@ -49,9 +50,16 @@ spec:
envFrom:
- configMapRef:
name: global-configmap
env:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow-service:5050"
- name: DB_CONNECTION_URL
value: "postgresql://spark_user:SuperSecurePwdHere@postgres-service:5432/spark_pg_db"
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
- name: mlflow-data
mountPath: {{ .Values.mlflowArtifactRoot }}
# The resource requests and limits in this config are too small for production!
# For an example with more realistic resource configuration, see
# ray-cluster.autoscaler.large.yaml.
Expand All @@ -72,6 +80,9 @@ spec:
volumes:
- name: ray-logs
emptyDir: {}
- name: mlflow-data
persistentVolumeClaim:
claimName: {{ .Values.mlflowPvcName }}
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
Expand Down Expand Up @@ -99,16 +110,19 @@ spec:
containers:
- name: ray-worker
image: ariya23156/sfmlops-ray
imagePullPolicy: Always
lifecycle:
preStop:
exec:
command: ["/bin/sh","-c","ray stop"]
envFrom:
- configMapRef:
name: global-configmap
containerEnv:
env:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow-service:{{ .Values.mlflowPort }}"
value: "http://mlflow-service:5050"
- name: DB_CONNECTION_URL
value: "postgresql://spark_user:SuperSecurePwdHere@postgres-service:5432/spark_pg_db"
# use volumeMounts.Optional.
# Refer to https://kubernetes.io/docs/concepts/storage/volumes/
volumeMounts:
Expand Down
23 changes: 5 additions & 18 deletions sfmlops-helm/templates/web-ui-deployment-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ spec:
- configMapRef:
name: global-configmap
env:
- name: NGINX_HOST_NAME
value: localhost
- name: TRAINING_SERVICE_SERVER
value: "training-service-service.mlops.svc.cluster.local:4243"
- name: TRAINING_SERVICE_URL_PREFIX
value: # this is intentional to pass in an empty string
- name: FORECAST_ENDPOINT_URL
value: http://localhost/api/forecasters/forecast
value: "http://forecast-service-service.mlops.svc.cluster.local:4242/forecast"
- name: DB_CONNECTION_URL
value: "postgresql://spark_user:SuperSecurePwdHere@postgres-service:{{ .Values.postgresPort }}/spark_pg_db"
---
Expand All @@ -59,18 +61,3 @@ spec:
ports:
- port: {{ .Values.webUiPort }}
targetPort: {{ .Values.webUiPort }}

# ---
# apiVersion: v1
# kind: Service
# metadata:
# name: streamlit-service
# spec:
# type: LoadBalancer
# selector:
# component: {{ .Values.webUiLabel }}
# ports:
# - name: streamlit-port
# protocol: TCP
# port: 8000
# targetPort: {{ .Values.webUiPort }}
9 changes: 7 additions & 2 deletions sfmlops-helm/values-airflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ images:
airflow:
repository: ariya23156/sfmlops-airflow-spark
tag: latest
pullPolicy: Always

web:
service:
Expand Down Expand Up @@ -45,5 +46,9 @@ env:
value: "jdbc:postgresql://postgres-service.mlops.svc.cluster.local:5432/spark_pg_db"
- name: KAFKA_BOOTSTRAP_SERVER
value: "kafka-release.kafka.svc.cluster.local:9092"
- name: NGINX_HOST_NAME
value: localhost
- name: TRAINING_SERVICE_SERVER
value: "training-service-service.mlops.svc.cluster.local:4243"
- name: TRAINING_SERVICE_URL_PREFIX
value: # this is intentional to pass in an empty string
- name: FORECAST_ENDPOINT_URL
value: "http://forecast-service-service.mlops.svc.cluster.local:4242/forecast"
10 changes: 0 additions & 10 deletions sfmlops-helm/values-ray.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,2 @@
image:
tag: 2.9.3-py39-cpu-aarch64

head:
containerEnv:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow-service:5050"

worker:
containerEnv:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow-service:5050"

0 comments on commit 2ef2c24

Please sign in to comment.