diff --git a/README.md b/README.md index 6572be9..f0d75b1 100644 --- a/README.md +++ b/README.md @@ -24,19 +24,13 @@ From doc: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-com ## How to run Kubernetes/Helm 1. `cd sfmlops-helm` and `helm dependency build` to fetch all dependencies 2. Both install and upgrade the main chart: `helm upgrade --install --create-namespace -n mlops sfmlops-helm ./ -f values.yaml -f values-ray.yaml` -3. Deploy Kafka: `helm -n kafka upgrade --install kafka-release oci://registry-1.docker.io/bitnamicharts/kafka --create-namespace --version 23.0.7 -f values-kafka.yaml` +3. Deploy Kafka: + 1. `helm repo add bitnami https://charts.bitnami.com/bitnami` + 2. `helm -n kafka upgrade --install kafka-release oci://registry-1.docker.io/bitnamicharts/kafka --create-namespace --version 23.0.7 -f values-kafka.yaml` 4. Deploy Airflow: 1. `helm repo add apache-airflow https://airflow.apache.org` 2. `helm -n airflow upgrade --install airflow apache-airflow/airflow --create-namespace --version 1.13.1 -f values-airflow.yaml` 5. Forward Airflow UI port, so we can access: `kubectl port-forward svc/airflow-webserver 8080:8080 --namespace airflow` -6. Install Ray (KubeRay): - 1. Deploy KubeRay operator: - 1. `helm repo add kuberay https://ray-project.github.io/kuberay-helm/ && helm repo update` - 2. `helm upgrade --install -n mlops kuberay-operator kuberay/kuberay-operator --version 1.1.0-rc.0` - 2. Deploy RayCluster Custom Resource (CR) - 1. `helm upgrade --install -n mlops raycluster kuberay/ray-cluster --version 1.1.0-rc.0 --set 'image.tag=2.9.3-py39-cpu-aarch64'` - 2. Verify by `kubectl get rayclusters` -7. Forward Ray Dashboard: `kubectl port-forward --address 0.0.0.0 service/ray-head-service 8265:8265` **Note:** If you want to change namespace `kafka` and/or release name `kafka-release` of Kafka, please also change them in `values.yaml` and `KAFKA_BOOTSTRAP_SERVER` env var in `values-airflow.yaml`. They are also used in templating. @@ -52,18 +46,13 @@ Ray Dashboard: 8265 Nginx: 80 Forecast service: 4242 (proxied by nginx) Training service: 4243 (proxied by nginx) - -### bitnami/kafka helm install -1. `helm repo add bitnami https://charts.bitnami.com/bitnami` -2. `cd sfmlops-helm` and `helm dependency build` to fetch all dependencies -3. Run using `helm install sfmlops-helm sfmlops-helm/ -f values.yaml -f values-kafka.yaml` - -### temp -explicitly install kafka after main chart -`helm install kafka-release oci://registry-1.docker.io/bitnamicharts/kafka --version 28.0.0 -f values-kafka.yaml` +Postgres: 5432 +PGadmin: 16543 +Kafka: 9092 +Kafka UI: 8800 ### Note on Kafka Docker Compose and Helm -Kafka services on Docker Compose and Halm are different in settings, mainly in Docker Compose, we use KRaft for config management (which is newer), but in Helm +Kafka services on Docker Compose and Halm are different in settings, mainly in Docker Compose, we use KRaft for config management (which is newer), but in Helm, we use ZooKeeper because, honestly, I'm not managed to pull it off with KRaft, sorry :'( (It's quite complex). ### Note on Stream processing options There are a few options we can do to consume the stream data from Kafka producer and save to Postgres @@ -114,4 +103,6 @@ If we restart the Ray container, all previous job history will be gone because R ## References - Airflow Helm: https://airflow.apache.org/docs/helm-chart/stable/index.html +- Airflow Helm default values.yaml: https://github.com/apache/airflow/blob/main/chart/values.yaml +- Ray sample config: https://github.com/ray-project/kuberay/tree/master/ray-operator/config/samples - Bitnami Kafka Helm: https://github.com/bitnami/charts/tree/main/bitnami/kafka diff --git a/dev_files/post_service.py b/dev_files/post_service.py index 5344e86..e9b9150 100644 --- a/dev_files/post_service.py +++ b/dev_files/post_service.py @@ -19,19 +19,7 @@ def wait_until_status( body = [ { - "store_id": "1", - "product_name": "product_A", - "begin_date": "2023-03-01T00:00:00Z", - "end_date": "2023-03-07T00:00:00Z", - }, - { - "store_id": "2", - "product_name": "product_A", - "begin_date": "2023-03-01T00:00:00Z", - "end_date": "2023-03-07T00:00:00Z", - }, - { - "store_id": "3", + "store_id": "1000", "product_name": "product_A", "begin_date": "2023-03-01T00:00:00Z", "end_date": "2023-03-07T00:00:00Z", @@ -42,18 +30,20 @@ def wait_until_status( # resp = requests.post("http://localhost:4243/train", json=body) # reverse proxy test (nginx) -# resp = requests.post("http://localhost/api/trainers/train", json=body) -# resp = requests.post("http://localhost/api/trainers/1000/product_A/train", json=body) -resp = requests.post("http://localhost/api/forecasters/forecast", json=body) +# resp = requests.post("http://localhost/api/trainers/train") +resp = requests.post("http://localhost/api/trainers/1000/product_A/train") +# resp = requests.post("http://localhost/api/forecasters/forecast", json=body) +# resp = requests.get("http://localhost/") +print("status_code:", resp.status_code) print(resp.raw) resp_json = resp.json() pprint(resp_json, indent=4) -print("Watching training task status...") -status_to_wait_for = {"SUCCEEDED", "STOPPED", "FAILED"} -wait_until_status( - endpoint=f"http://localhost/api/trainers/training_job_status/{resp_json['train_job_id']}", - status_to_wait_for=status_to_wait_for, - poll_interval=5, - timeout_seconds=60 * 30, # 30 mins -) +# print("Watching training task status...") +# status_to_wait_for = {"SUCCEEDED", "STOPPED", "FAILED"} +# wait_until_status( +# endpoint=f"http://localhost/api/trainers/training_job_status/{resp_json['train_job_id']}", +# status_to_wait_for=status_to_wait_for, +# poll_interval=5, +# timeout_seconds=60 * 30, # 30 mins +# ) diff --git a/docker-compose.yml b/docker-compose.yml index da3dd93..eacc925 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -84,6 +84,7 @@ services: - .env environment: - KAFKA_BOOTSTRAP_SERVER=kafka:9092 + - DB_CONNECTION_URL=postgresql://spark_user:SuperSecurePwdHere@postgres:{POSTGRES_PORT}/spark_pg_db networks: - forecast-network volumes: @@ -126,7 +127,8 @@ services: env_file: - .env environment: - - NGINX_HOST_NAME=nginx + - TRAINING_SERVICE_SERVER=nginx + - TRAINING_SERVICE_URL_PREFIX=api/trainers/ # trailing / is important - FORECAST_ENDPOINT_URL=http://nginx/api/forecasters/forecast - DB_CONNECTION_URL=postgresql://spark_user:SuperSecurePwdHere@postgres:${POSTGRES_PORT}/spark_pg_db networks: @@ -146,6 +148,9 @@ services: TRAINING_SERVICE_PORT: ${TRAINING_SERVICE_PORT} env_file: - .env + environment: + - RAY_DASHBOARD_HOST=ray + - RAY_DASHBOARD_PORT=8265 networks: - forecast-network depends_on: @@ -175,6 +180,8 @@ services: - RAY_PROMETHEUS_HOST=http://prometheus:9090 - RAY_PROMETHEUS_NAME=Prometheus - RAY_GRAFANA_IFRAME_HOST=http://localhost:${GRAFANA_PORT} + - MLFLOW_TRACKING_URI=http://mlflow:5050 + - DB_CONNECTION_URL=postgresql://spark_user:SuperSecurePwdHere@postgres:${POSTGRES_PORT}/spark_pg_db networks: - forecast-network - backend-network diff --git a/services/airflow/dags/task_operators.py b/services/airflow/dags/task_operators.py index 718ed17..8b1a00b 100644 --- a/services/airflow/dags/task_operators.py +++ b/services/airflow/dags/task_operators.py @@ -13,7 +13,11 @@ prepare_db, ) -NGINX_HOST_NAME = os.getenv("NGINX_HOST_NAME", "nginx") +TRAINING_SERVICE_SERVER = os.getenv("TRAINING_SERVICE_SERVER", "nginx") +TRAINING_SERVICE_URL_PREFIX = os.getenv("TRAINING_SERVICE_URL_PREFIX", "api/trainers/") +FORECAST_ENDPOINT_URL = os.getenv( + "FORECAST_ENDPOINT_URL", f"http://nginx/api/forecasters/forecast" +) SALES_TABLE_NAME = os.getenv("SALES_TABLE_NAME", "rossman_sales") FORECAST_TABLE_NAME = os.getenv("FORECAST_TABLE_NAME", "forecast_results") POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5432") @@ -54,7 +58,7 @@ def check_status(**kwargs): # New function for task print("Watching training task status...") status_to_wait_for = {"SUCCEEDED", "STOPPED", "FAILED"} wait_until_status( - endpoint=f"http://{NGINX_HOST_NAME}/api/trainers/training_job_status/{resp_json['train_job_id']}", + endpoint=f"http://{TRAINING_SERVICE_SERVER}/{TRAINING_SERVICE_URL_PREFIX}training_job_status/{resp_json['train_job_id']}", status_to_wait_for=status_to_wait_for, poll_interval=5, timeout_seconds=60 * 30, @@ -96,9 +100,7 @@ def post_forecast(**kwargs): # New function for task ti = kwargs["ti"] requst_body = ti.xcom_pull(task_ids="build_request_body") # Pull XCom print(f"type: {type(requst_body)}") - resp = requests.post( - f"http://{NGINX_HOST_NAME}/api/forecasters/forecast", json=requst_body - ) + resp = requests.post(FORECAST_ENDPOINT_URL, json=requst_body) print(f"Status: {resp.status_code}") return resp.json() diff --git a/services/airflow/dags/train_predict_to_db_dag.py b/services/airflow/dags/train_predict_to_db_dag.py index 454ca00..4cb6340 100644 --- a/services/airflow/dags/train_predict_to_db_dag.py +++ b/services/airflow/dags/train_predict_to_db_dag.py @@ -11,7 +11,8 @@ save_forecasts_to_db, ) -NGINX_HOST_NAME = os.getenv("NGINX_HOST_NAME", "nginx") +TRAINING_SERVICE_SERVER = os.getenv("TRAINING_SERVICE_SERVER", "nginx") +TRAINING_SERVICE_URL_PREFIX = os.getenv("TRAINING_SERVICE_URL_PREFIX", "api/trainers/") with DAG( dag_id="train_predict_to_db_dag", @@ -35,8 +36,8 @@ # POST to the training service trigger_train_task = BashOperator( task_id="call_train_service", - bash_command=f"curl -X POST http://{NGINX_HOST_NAME}/api/trainers/train", - # bash_command=f"curl -X POST http://{NGINX_HOST_NAME}/api/trainers/100/product_A/train", + bash_command=f"curl -X POST http://{TRAINING_SERVICE_SERVER}/{TRAINING_SERVICE_URL_PREFIX}train", + # bash_command=f"curl -X POST http://{TRAINING_SERVICE_SERVER}/{TRAINING_SERVICE_URL_PREFIX}100/product_A/train", ) # Poll the status of the training job diff --git a/services/web-ui/app/main.py b/services/web-ui/app/main.py index 6749383..c17609d 100644 --- a/services/web-ui/app/main.py +++ b/services/web-ui/app/main.py @@ -17,7 +17,8 @@ df_from_forecast_response, ) -NGINX_HOST_NAME = os.getenv("NGINX_HOST_NAME", "nginx") +TRAINING_SERVICE_SERVER = os.getenv("TRAINING_SERVICE_SERVER", "nginx") +TRAINING_SERVICE_URL_PREFIX = os.getenv("TRAINING_SERVICE_URL_PREFIX", "api/trainers/") FORECAST_ENDPOINT_URL = os.getenv( "FORECAST_ENDPOINT_URL", f"http://nginx/api/forecasters/forecast" ) @@ -101,14 +102,14 @@ f"Store ID: {input_store_id} | Product name: {input_product_name}" ) resp = requests.post( - f"http://{NGINX_HOST_NAME}/api/trainers/{input_store_id}/{input_product_name}/train" + f"http://{TRAINING_SERVICE_SERVER}/{TRAINING_SERVICE_URL_PREFIX}{input_store_id}/{input_product_name}/train" ) resp_json = resp.json() st.json(resp_json) st.write("Watching training task status...") status_to_wait_for = {"SUCCEEDED", "STOPPED", "FAILED"} exit_status = wait_until_status( - endpoint=f"http://{NGINX_HOST_NAME}/api/trainers/training_job_status/{resp_json['train_job_id']}", + endpoint=f"http://{TRAINING_SERVICE_SERVER}/{TRAINING_SERVICE_URL_PREFIX}training_job_status/{resp_json['train_job_id']}", status_to_wait_for=status_to_wait_for, poll_interval=1, timeout_seconds=30, # 30 seconds diff --git a/sfmlops-helm/Chart.yaml b/sfmlops-helm/Chart.yaml index bbc06b5..91c8aed 100644 --- a/sfmlops-helm/Chart.yaml +++ b/sfmlops-helm/Chart.yaml @@ -33,6 +33,3 @@ dependencies: - name: ray-cluster version: 1.1.0-rc.0 repository: https://ray-project.github.io/kuberay-helm/ -# - name: kafka -# version: 23.0.7 -# repository: oci://registry-1.docker.io/bitnamicharts diff --git a/sfmlops-helm/templates/forecast-service-deployment-service.yaml b/sfmlops-helm/templates/forecast-service-deployment-service.yaml index 2f984dd..a96dd31 100644 --- a/sfmlops-helm/templates/forecast-service-deployment-service.yaml +++ b/sfmlops-helm/templates/forecast-service-deployment-service.yaml @@ -12,11 +12,18 @@ spec: labels: component: {{ .Values.forecastServiceLabel }} spec: + volumes: + - name: mlflow-data + persistentVolumeClaim: + claimName: {{ .Values.mlflowPvcName }} containers: - name: forecast-service image: ariya23156/sfmlops-forecast-service ports: - containerPort: {{ .Values.forecastServicePort }} + volumeMounts: + - name: mlflow-data + mountPath: {{ .Values.mlflowArtifactRoot }} envFrom: - configMapRef: name: global-configmap diff --git a/sfmlops-helm/templates/ray-cluster.yaml b/sfmlops-helm/templates/ray-cluster.yaml index 2ab5bfa..4821427 100644 --- a/sfmlops-helm/templates/ray-cluster.yaml +++ b/sfmlops-helm/templates/ray-cluster.yaml @@ -35,6 +35,7 @@ spec: containers: - name: ray-head image: ariya23156/sfmlops-ray + imagePullPolicy: Always ports: - containerPort: 6379 name: gcs @@ -49,9 +50,16 @@ spec: envFrom: - configMapRef: name: global-configmap + env: + - name: MLFLOW_TRACKING_URI + value: "http://mlflow-service:5050" + - name: DB_CONNECTION_URL + value: "postgresql://spark_user:SuperSecurePwdHere@postgres-service:5432/spark_pg_db" volumeMounts: - mountPath: /tmp/ray name: ray-logs + - name: mlflow-data + mountPath: {{ .Values.mlflowArtifactRoot }} # The resource requests and limits in this config are too small for production! # For an example with more realistic resource configuration, see # ray-cluster.autoscaler.large.yaml. @@ -72,6 +80,9 @@ spec: volumes: - name: ray-logs emptyDir: {} + - name: mlflow-data + persistentVolumeClaim: + claimName: {{ .Values.mlflowPvcName }} workerGroupSpecs: # the pod replicas in this group typed worker - replicas: 1 @@ -99,6 +110,7 @@ spec: containers: - name: ray-worker image: ariya23156/sfmlops-ray + imagePullPolicy: Always lifecycle: preStop: exec: @@ -106,9 +118,11 @@ spec: envFrom: - configMapRef: name: global-configmap - containerEnv: + env: - name: MLFLOW_TRACKING_URI - value: "http://mlflow-service:{{ .Values.mlflowPort }}" + value: "http://mlflow-service:5050" + - name: DB_CONNECTION_URL + value: "postgresql://spark_user:SuperSecurePwdHere@postgres-service:5432/spark_pg_db" # use volumeMounts.Optional. # Refer to https://kubernetes.io/docs/concepts/storage/volumes/ volumeMounts: diff --git a/sfmlops-helm/templates/web-ui-deployment-service.yaml b/sfmlops-helm/templates/web-ui-deployment-service.yaml index 8508cf6..101187d 100644 --- a/sfmlops-helm/templates/web-ui-deployment-service.yaml +++ b/sfmlops-helm/templates/web-ui-deployment-service.yaml @@ -40,10 +40,12 @@ spec: - configMapRef: name: global-configmap env: - - name: NGINX_HOST_NAME - value: localhost + - name: TRAINING_SERVICE_SERVER + value: "training-service-service.mlops.svc.cluster.local:4243" + - name: TRAINING_SERVICE_URL_PREFIX + value: # this is intentional to pass in an empty string - name: FORECAST_ENDPOINT_URL - value: http://localhost/api/forecasters/forecast + value: "http://forecast-service-service.mlops.svc.cluster.local:4242/forecast" - name: DB_CONNECTION_URL value: "postgresql://spark_user:SuperSecurePwdHere@postgres-service:{{ .Values.postgresPort }}/spark_pg_db" --- @@ -59,18 +61,3 @@ spec: ports: - port: {{ .Values.webUiPort }} targetPort: {{ .Values.webUiPort }} - -# --- -# apiVersion: v1 -# kind: Service -# metadata: -# name: streamlit-service -# spec: -# type: LoadBalancer -# selector: -# component: {{ .Values.webUiLabel }} -# ports: -# - name: streamlit-port -# protocol: TCP -# port: 8000 -# targetPort: {{ .Values.webUiPort }} diff --git a/sfmlops-helm/values-airflow.yaml b/sfmlops-helm/values-airflow.yaml index 168fbff..2bded86 100644 --- a/sfmlops-helm/values-airflow.yaml +++ b/sfmlops-helm/values-airflow.yaml @@ -6,6 +6,7 @@ images: airflow: repository: ariya23156/sfmlops-airflow-spark tag: latest + pullPolicy: Always web: service: @@ -45,5 +46,9 @@ env: value: "jdbc:postgresql://postgres-service.mlops.svc.cluster.local:5432/spark_pg_db" - name: KAFKA_BOOTSTRAP_SERVER value: "kafka-release.kafka.svc.cluster.local:9092" - - name: NGINX_HOST_NAME - value: localhost + - name: TRAINING_SERVICE_SERVER + value: "training-service-service.mlops.svc.cluster.local:4243" + - name: TRAINING_SERVICE_URL_PREFIX + value: # this is intentional to pass in an empty string + - name: FORECAST_ENDPOINT_URL + value: "http://forecast-service-service.mlops.svc.cluster.local:4242/forecast" diff --git a/sfmlops-helm/values-ray.yaml b/sfmlops-helm/values-ray.yaml index 7244068..fbfbc1a 100644 --- a/sfmlops-helm/values-ray.yaml +++ b/sfmlops-helm/values-ray.yaml @@ -1,12 +1,2 @@ image: tag: 2.9.3-py39-cpu-aarch64 - -head: - containerEnv: - - name: MLFLOW_TRACKING_URI - value: "http://mlflow-service:5050" - -worker: - containerEnv: - - name: MLFLOW_TRACKING_URI - value: "http://mlflow-service:5050"