From 905131828c8a67648538372b45788cc382112e79 Mon Sep 17 00:00:00 2001 From: Hadar Cohen Date: Wed, 30 Jul 2025 12:17:28 +0300 Subject: [PATCH 01/10] ignore train-workflow.yaml since it's generated by train-workflow.py --- .gitignore | 5 +- train-workflow.yaml | 622 -------------------------------------------- 2 files changed, 4 insertions(+), 623 deletions(-) delete mode 100644 train-workflow.yaml diff --git a/.gitignore b/.gitignore index 147289b..8ffc60c 100644 --- a/.gitignore +++ b/.gitignore @@ -173,4 +173,7 @@ cython_debug/ # PyPI configuration file .pypirc -*.crt \ No newline at end of file +*.crt + +# Generated by train-workflow.py +train-workflow.yaml \ No newline at end of file diff --git a/train-workflow.yaml b/train-workflow.yaml deleted file mode 100644 index 5d0ec47..0000000 --- a/train-workflow.yaml +++ /dev/null @@ -1,622 +0,0 @@ -# PIPELINE DEFINITION -# Name: train-workflow -components: - comp-create-model-registry: - executorLabel: exec-create-model-registry - inputDefinitions: - parameters: - author: - parameterType: STRING - bucket_name: - parameterType: STRING - host: - parameterType: STRING - new_version: - parameterType: STRING - object_name: - parameterType: STRING - torch_version: - parameterType: STRING - user_token: - parameterType: STRING - comp-fetch-api-credentials: - executorLabel: exec-fetch-api-credentials - outputDefinitions: - parameters: - author: - parameterType: STRING - host: - parameterType: STRING - user_token: - parameterType: STRING - comp-generate-candidates: - executorLabel: exec-generate-candidates - inputDefinitions: - artifacts: - item_df_input: - artifactType: - schemaTitle: system.Dataset - schemaVersion: 0.0.1 - item_input_model: - artifactType: - schemaTitle: system.Model - schemaVersion: 0.0.1 - models_definition_input: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - user_df_input: - artifactType: - schemaTitle: system.Dataset - schemaVersion: 0.0.1 - user_input_model: - artifactType: - schemaTitle: system.Model - schemaVersion: 0.0.1 - comp-load-data-from-feast: - executorLabel: exec-load-data-from-feast - outputDefinitions: - artifacts: - interaction_df_output: - artifactType: - schemaTitle: system.Dataset - schemaVersion: 0.0.1 - item_df_output: - artifactType: - schemaTitle: system.Dataset - schemaVersion: 0.0.1 - user_df_output: - artifactType: - schemaTitle: system.Dataset - schemaVersion: 0.0.1 - comp-train-model: - executorLabel: exec-train-model - inputDefinitions: - artifacts: - interaction_df_input: - artifactType: - schemaTitle: system.Dataset - schemaVersion: 0.0.1 - item_df_input: - artifactType: - schemaTitle: system.Dataset - schemaVersion: 0.0.1 - user_df_input: - artifactType: - schemaTitle: system.Dataset - schemaVersion: 0.0.1 - outputDefinitions: - artifacts: - item_output_model: - artifactType: - schemaTitle: system.Model - schemaVersion: 0.0.1 - models_definition_output: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - user_output_model: - artifactType: - schemaTitle: system.Model - schemaVersion: 0.0.1 - parameters: - bucket_name: - parameterType: STRING - new_version: - parameterType: STRING - object_name: - parameterType: STRING - torch_version: - parameterType: STRING -deploymentSpec: - executors: - exec-create-model-registry: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - create_model_registry - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.11.0'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ - \ python3 -m pip install --quiet --no-warn-script-location 'model_registry'\ - \ && \"$0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef create_model_registry(\n author: str,\n user_token: str,\n\ - \ host: str,\n bucket_name: str,\n new_version: str,\n object_name:\ - \ str,\n torch_version: str\n):\n import os\n from model_registry\ - \ import ModelRegistry, utils\n\n registry = ModelRegistry(host, author=author,\ - \ user_token=user_token)\n # Use DNS with the namespace 'rhoai-model-registries'\n\ - \ model_endpoint = f\"http://minio.rhoai-model-registries.svc.cluster.local:{os.environ.get('MINIO_PORT',\ - \ '9000')}\"\n\n registry.register_model(\n name=\"item-encoder\"\ - ,\n uri=utils.s3_uri_from(endpoint=model_endpoint, bucket=bucket_name,\ - \ path=object_name, region=os.environ.get(\"REGION\", \"us-east-1\")),\n\ - \ version=new_version,\n model_format_name=\"\ - pytorch\",\n model_format_version=torch_version,\n \ - \ storage_key= \"minio\",\n )\n\n" - image: quay.io/ecosystem-appeng/rec-sys-app:0.0.43 - exec-fetch-api-credentials: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - fetch_api_credentials - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.11.0'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ - $0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef fetch_api_credentials() -> NamedTuple('ocContext', [('author',\ - \ str), ('user_token', str), ('host', str)]):\n import os, subprocess\n\ - \ from typing import NamedTuple\n\n author_value = subprocess.run(\"\ - oc whoami\", shell=True, capture_output=True, text=True, check=True).stdout.strip()\n\ - \ user_token_value = subprocess.run(\"oc whoami -t\", shell=True, capture_output=True,\ - \ text=True, check=True).stdout.strip()\n\n mr_namespace = os.getenv(\"\ - MODEL_REGISTRY_NAMESPACE\",\"rhoai-model-registries\")\n mr_container\ - \ = os.getenv(\"MODEL_REGISTRY_CONTAINER\",\"modelregistry-sample\")\n\n\ - \ cmd = f\"oc get svc {mr_container} -n {mr_namespace} -o json | jq '.metadata.annotations.\\\ - \"routing.opendatahub.io/external-address-rest\\\"'\"\n host_output =\ - \ subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True).stdout.strip()\n\ - \ host_value = f\"https://{host_output[1:-5]}\" # Remove quotes and :443\n\ - \n ocContext = NamedTuple('ocContext', [('author', str), ('user_token',\ - \ str), ('host', str)])\n return ocContext(author_value, user_token_value,\ - \ host_value)\n\n" - env: - - name: MODEL_REGISTRY_NAMESPACE - - name: MODEL_REGISTRY_CONTAINER - image: quay.io/rh-ee-ofridman/model-registry-python-oc - exec-generate-candidates: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - generate_candidates - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.11.0'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ - $0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef generate_candidates(item_input_model: Input[Model], user_input_model:\ - \ Input[Model], item_df_input: Input[Dataset], user_df_input: Input[Dataset],\ - \ models_definition_input: Input[Artifact]):\n from feast import FeatureStore\n\ - \ from feast.data_source import PushMode\n from models.data_util import\ - \ data_preproccess\n from models.entity_tower import EntityTower\n \ - \ from service.clip_encoder import ClipEncoder\n import pandas as pd\n\ - \ import numpy as np\n from datetime import datetime\n import torch\n\ - \ import subprocess\n import json\n\n with open(models_definition_input.path,\ - \ 'r') as f:\n models_definition :dict = json.load(f)\n\n result\ - \ = subprocess.run(\n [\"/bin/bash\", \"-c\", \"ls && ./entry_point.sh\"\ - ],\n capture_output=True, # Capture stdout and stderr\n text=True,\ - \ # Return output as strings (not bytes)\n # check=True\ - \ # Raise an error if the command fails\n )\n\n # Print\ - \ the stdout\n print(\"Standard Output:\")\n print(result.stdout)\n\ - \n # Print the stderr (if any)\n print(\"Standard Error:\")\n print(result.stderr)\n\ - \ with open('feature_repo/feature_store.yaml', 'r') as file:\n \ - \ print(file.read())\n\n store = FeatureStore(repo_path=\"feature_repo/\"\ - )\n\n # device = torch.device('cuda' if torch.cuda.is_available() else\ - \ 'cpu')\n device = torch.device('cpu')\n item_encoder = EntityTower(models_definition['items_num_numerical'],\ - \ models_definition['items_num_categorical'])\n user_encoder = EntityTower(models_definition['users_num_numerical'],\ - \ models_definition['users_num_categorical'])\n item_encoder.load_state_dict(torch.load(item_input_model.path))\n\ - \ user_encoder.load_state_dict(torch.load(user_input_model.path))\n \ - \ item_encoder.to(device)\n user_encoder.to(device)\n item_encoder.eval()\n\ - \ user_encoder.eval()\n # load item and user dataframes\n item_df\ - \ = pd.read_parquet(item_df_input.path)\n user_df = pd.read_parquet(user_df_input.path)\n\ - \n # Create a new table to be push to the online store\n item_embed_df\ - \ = item_df[['item_id']].copy()\n user_embed_df = user_df[['user_id']].copy()\n\ - \n # Encode the items and users\n proccessed_items = data_preproccess(item_df)\n\ - \ proccessed_users = data_preproccess(user_df)\n # Move tensors to\ - \ device\n proccessed_items = {key: value.to(device) if type(value) ==\ - \ torch.Tensor else value for key, value in proccessed_items.items()}\n\ - \ proccessed_users = {key: value.to(device) if type(value) == torch.Tensor\ - \ else value for key, value in proccessed_users.items()}\n item_embed_df['embedding']\ - \ = item_encoder(**proccessed_items).detach().numpy().tolist()\n user_embed_df['embedding']\ - \ = user_encoder(**proccessed_users).detach().numpy().tolist()\n\n #\ - \ Add the currnet timestamp\n current_time = datetime.now()\n item_embed_df['event_timestamp']\ - \ = current_time\n user_embed_df['event_timestamp'] = current_time\n\n\ - \ # Push the new embedding to the offline and online store\n store.push('item_embed_push_source',\ - \ item_embed_df, to=PushMode.ONLINE, allow_registry_cache=False)\n store.push('user_embed_push_source',\ - \ user_embed_df, to=PushMode.ONLINE, allow_registry_cache=False)\n\n \ - \ # Store the embedding of text features for search by text\n item_text_features_embed\ - \ = item_df[['item_id']].copy()\n # item_text_features_embed['product_name']\ - \ = proccessed_items['text_features'].detach()[:, 0, :].numpy().tolist()\n\ - \ item_text_features_embed['about_product_embedding'] = proccessed_items['text_features'].detach()[:,\ - \ 1, :].numpy().tolist()\n item_text_features_embed['event_timestamp']\ - \ = current_time\n\n store.push('item_textual_features_embed', item_text_features_embed,\ - \ to=PushMode.ONLINE, allow_registry_cache=False)\n\n # Store the embedding\ - \ of clip features for search by image\n clip_encoder = ClipEncoder()\n\ - \ item_clip_features_embed = clip_encoder.clip_embeddings(item_df)\n\ - \ store.push('item_clip_features_embed', item_clip_features_embed, to=PushMode.ONLINE,\ - \ allow_registry_cache=False)\n\n # Materilize the online store\n \ - \ store.materialize_incremental(current_time, feature_views=['item_embedding',\ - \ 'user_items', 'item_features', 'item_textual_features_embed'])\n\n \ - \ # Calculate user recommendations for each user\n item_embedding_view\ - \ = 'item_embedding'\n k = 64\n item_recommendation = []\n for\ - \ user_embed in user_embed_df['embedding']:\n item_recommendation.append(\n\ - \ store.retrieve_online_documents(\n query=user_embed,\n\ - \ top_k=k,\n features=[f'{item_embedding_view}:item_id']\n\ - \ ).to_df()['item_id'].to_list()\n )\n\n # Pushing\ - \ the calculated items to the online store\n user_items_df = user_embed_df[['user_id']].copy()\n\ - \ user_items_df['event_timestamp'] = current_time\n user_items_df['top_k_item_ids']\ - \ = item_recommendation\n\n store.push('user_items_push_source', user_items_df,\ - \ to=PushMode.ONLINE, allow_registry_cache=False)\n\n" - env: - - name: FEAST_PROJECT_NAME - value: feast_edb_rec_sys - - name: FEAST_REGISTRY_URL - value: feast-feast-edb-rec-sys-registry.rec-sys.svc.cluster.local - image: quay.io/ecosystem-appeng/rec-sys-app:0.0.43 - exec-load-data-from-feast: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - load_data_from_feast - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.11.0'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ - \ python3 -m pip install --quiet --no-warn-script-location 'psycopg2-binary'\ - \ && \"$0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef load_data_from_feast(item_df_output: Output[Dataset], user_df_output:\ - \ Output[Dataset], interaction_df_output: Output[Dataset]):\n from feast\ - \ import FeatureStore\n import pandas as pd\n import os\n from\ - \ service.dataset_provider import LocalDatasetProvider\n from sqlalchemy\ - \ import create_engine, text\n import subprocess\n\n result = subprocess.run(\n\ - \ [\"/bin/bash\", \"-c\", \"ls && ./entry_point.sh\"],\n capture_output=True,\ - \ # Capture stdout and stderr\n text=True, # Return output\ - \ as strings (not bytes)\n )\n\n # Print the stdout\n print(\"\ - Standard Output:\")\n print(result.stdout)\n\n # Print the stderr\ - \ (if any)\n print(\"Standard Error:\")\n print(result.stderr)\n\n\ - \ with open('feature_repo/feature_store.yaml', 'r') as file:\n \ - \ print(file.read())\n store = FeatureStore(repo_path=\"feature_repo/\"\ - )\n store.refresh_registry()\n print('registry refreshed')\n dataset_provider\ - \ = LocalDatasetProvider(store)\n\n # retrieve datasets for training\n\ - \ item_df = dataset_provider.item_df()\n user_df = dataset_provider.user_df()\n\ - \ interaction_df = dataset_provider.interaction_df()\n\n uri = os.getenv('uri',\ - \ None)\n engine = create_engine(uri)\n\n def table_exists(engine,\ - \ table_name):\n query = text(\"SELECT COUNT(*) FROM information_schema.tables\ - \ WHERE table_name = :table_name\")\n with engine.connect() as connection:\n\ - \ result = connection.execute(query, {\"table_name\": table_name}).scalar()\n\ - \ return result > 0\n\n if table_exists(engine, 'new_users'):\n\ - \ query_new_users = 'SELECT * FROM new_users'\n stream_users_df\ - \ = pd.read_sql(query_new_users, engine).rename(columns={'timestamp':'signup_date'})\n\ - \n user_df = pd.concat([user_df, stream_users_df], axis=0)\n\n \ - \ if table_exists(engine, 'stream_interaction'):\n query_positive\ - \ = 'SELECT * FROM stream_interaction'\n stream_positive_inter_df\ - \ = pd.read_sql(query_positive, engine).rename(columns={'timestamp':'event_timestamp'})\n\ - \n interaction_df = pd.concat([interaction_df, stream_positive_inter_df],\ - \ axis=0)\n\n # Pass artifacts\n item_df.to_parquet(item_df_output.path)\n\ - \ user_df.to_parquet(user_df_output.path)\n interaction_df.to_parquet(interaction_df_output.path)\n\ - \n item_df_output.metadata['format'] = 'parquet'\n user_df_output.metadata['format']\ - \ = 'parquet'\n interaction_df_output.metadata['format'] = 'parquet'\n\ - \n" - env: - - name: FEAST_PROJECT_NAME - value: feast_rec_sys - - name: FEAST_REGISTRY_URL - value: feast-feast-rec-sys-registry.rec-sys.svc.cluster.local - image: quay.io/ecosystem-appeng/rec-sys-app:0.0.43 - exec-train-model: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - train_model - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.11.0'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ - \ python3 -m pip install --quiet --no-warn-script-location 'minio' 'psycopg2-binary'\ - \ && \"$0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef train_model(\n item_df_input: Input[Dataset],\n user_df_input:\ - \ Input[Dataset],\n interaction_df_input: Input[Dataset],\n item_output_model:\ - \ Output[Model],\n user_output_model: Output[Model],\n models_definition_output:\ - \ Output[Artifact]\n) -> NamedTuple('modelMetadata', [('bucket_name', str),\ - \ ('new_version', str), ('object_name', str), ('torch_version', str)]):\n\ - \ from models.train_two_tower import create_and_train_two_tower\n \ - \ import pandas as pd\n import torch\n import json\n import os\n\ - \ from minio import Minio\n from sqlalchemy import create_engine,\ - \ text\n\n item_df = pd.read_parquet(item_df_input.path)\n user_df\ - \ = pd.read_parquet(user_df_input.path)\n interaction_df = pd.read_parquet(interaction_df_input.path)\n\ - \n item_encoder, user_encoder, models_definition= create_and_train_two_tower(item_df,\ - \ user_df, interaction_df, return_model_definition=True)\n\n torch.save(item_encoder.state_dict(),\ - \ item_output_model.path)\n torch.save(user_encoder.state_dict(), user_output_model.path)\n\ - \ item_output_model.metadata['framework'] = 'pytorch'\n user_output_model.metadata['framework']\ - \ = 'pytorch'\n with open(models_definition_output.path, 'w') as f:\n\ - \ json.dump(models_definition, f)\n\n # \n engine = create_engine(os.getenv('uri',\ - \ None))\n # Check if table exists\n def table_exists(engine, table_name):\n\ - \ query = text(\"SELECT COUNT(*) FROM information_schema.tables WHERE\ - \ table_name = :table_name\")\n with engine.connect() as connection:\n\ - \ result = connection.execute(query, {\"table_name\": table_name}).scalar()\n\ - \ return result > 0\n\n if not table_exists(engine, 'model_version'):\n\ - \ # Create table if it doesn't exist\n with engine.connect()\ - \ as connection:\n connection.execute(text(\"\"\"\n \ - \ CREATE TABLE model_version (\n id SERIAL PRIMARY\ - \ KEY,\n version VARCHAR(50) NOT NULL, \n \ - \ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n \ - \ );\n \"\"\"))\n new_version = '1.0.0'\n \ - \ connection.execute(text(f\"INSERT INTO model_version (version) VALUES\ - \ ('{new_version}');\"))\n connection.commit()\n else:\n \ - \ # Get last version and increment minor version by 0.0.1\n \ - \ with engine.connect() as connection:\n last_version = connection.execute(text(\"\ - SELECT version FROM model_version ORDER BY id DESC LIMIT 1\")).scalar()\n\ - \ major, minor, patch = map(int, last_version.split('.'))\n \ - \ new_version = f\"{major}.{minor}.{patch + 1}\"\n \ - \ connection.execute(text(\"UPDATE model_version SET version = :version\ - \ WHERE id = (SELECT MAX(id) FROM model_version)\"), {\"version\": new_version})\n\ - \ connection.commit()\n\n minio_client = Minio(\n endpoint=os.getenv('MINIO_HOST',\ - \ \"endpoint\") + \\\n ':' + os.getenv('MINIO_PORT', '9000'),\n\ - \ access_key=os.getenv('MINIO_ACCESS_KEY', \"access-key\"),\n \ - \ secret_key=os.getenv('MINIO_SECRET_KEY', \"secret-key\"),\n \ - \ secure=False # Set to True if using HTTPS\n )\n\n bucket_name\ - \ = \"user-encoder\"\n object_name = f\"user-encoder-{new_version}.pth\"\ - \ \n configuration = f'user-encoder-config-{new_version}.json'\n\n \ - \ # Ensure the bucket exists, create it if it doesn't\n if not minio_client.bucket_exists(bucket_name):\n\ - \ minio_client.make_bucket(bucket_name)\n\n minio_client.fput_object(\n\ - \ bucket_name=bucket_name,\n object_name=object_name,\n \ - \ file_path=user_output_model.path\n )\n # Save model configurations\n\ - \ minio_client.fput_object(\n bucket_name=bucket_name,\n \ - \ object_name=configuration,\n file_path=models_definition_output.path\n\ - \ )\n modelMetadata = NamedTuple('modelMetadata', [('bucket_name',\ - \ str), ('new_version', str), ('object_name', str), ('torch_version', str)])\n\ - \ return modelMetadata(bucket_name, new_version, object_name, torch.__version__[0:5])\n\ - \n" - image: quay.io/ecosystem-appeng/rec-sys-app:0.0.43 -pipelineInfo: - name: train-workflow -root: - dag: - tasks: - create-model-registry: - cachingOptions: {} - componentRef: - name: comp-create-model-registry - dependentTasks: - - fetch-api-credentials - - train-model - inputs: - parameters: - author: - taskOutputParameter: - outputParameterKey: author - producerTask: fetch-api-credentials - bucket_name: - taskOutputParameter: - outputParameterKey: bucket_name - producerTask: train-model - host: - taskOutputParameter: - outputParameterKey: host - producerTask: fetch-api-credentials - new_version: - taskOutputParameter: - outputParameterKey: new_version - producerTask: train-model - object_name: - taskOutputParameter: - outputParameterKey: object_name - producerTask: train-model - torch_version: - taskOutputParameter: - outputParameterKey: torch_version - producerTask: train-model - user_token: - taskOutputParameter: - outputParameterKey: user_token - producerTask: fetch-api-credentials - taskInfo: - name: create-model-registry - fetch-api-credentials: - cachingOptions: - enableCache: true - componentRef: - name: comp-fetch-api-credentials - taskInfo: - name: fetch-api-credentials - generate-candidates: - cachingOptions: {} - componentRef: - name: comp-generate-candidates - dependentTasks: - - load-data-from-feast - - train-model - inputs: - artifacts: - item_df_input: - taskOutputArtifact: - outputArtifactKey: item_df_output - producerTask: load-data-from-feast - item_input_model: - taskOutputArtifact: - outputArtifactKey: item_output_model - producerTask: train-model - models_definition_input: - taskOutputArtifact: - outputArtifactKey: models_definition_output - producerTask: train-model - user_df_input: - taskOutputArtifact: - outputArtifactKey: user_df_output - producerTask: load-data-from-feast - user_input_model: - taskOutputArtifact: - outputArtifactKey: user_output_model - producerTask: train-model - taskInfo: - name: generate-candidates - load-data-from-feast: - cachingOptions: {} - componentRef: - name: comp-load-data-from-feast - taskInfo: - name: load-data-from-feast - train-model: - cachingOptions: {} - componentRef: - name: comp-train-model - dependentTasks: - - load-data-from-feast - inputs: - artifacts: - interaction_df_input: - taskOutputArtifact: - outputArtifactKey: interaction_df_output - producerTask: load-data-from-feast - item_df_input: - taskOutputArtifact: - outputArtifactKey: item_df_output - producerTask: load-data-from-feast - user_df_input: - taskOutputArtifact: - outputArtifactKey: user_df_output - producerTask: load-data-from-feast - taskInfo: - name: train-model -schemaVersion: 2.1.0 -sdkVersion: kfp-2.11.0 ---- -platforms: - kubernetes: - deploymentSpec: - executors: - exec-create-model-registry: - secretAsEnv: - - keyToEnv: - - envVar: MINIO_HOST - secretKey: host - - envVar: MINIO_PORT - secretKey: port - secretName: ds-pipeline-s3-dspa - exec-generate-candidates: - secretAsEnv: - - keyToEnv: - - envVar: uri - secretKey: uri - - envVar: DB_PASSWORD - secretKey: password - - envVar: DB_HOST - secretKey: host - - envVar: DB_NAME - secretKey: dbname - - envVar: DB_USER - secretKey: user - - envVar: DB_PORT - secretKey: port - secretName: cluster-sample-app - secretAsVolume: - - mountPath: /app/feature_repo/secrets - optional: false - secretName: feast-feast-edb-rec-sys-registry-tls - exec-load-data-from-feast: - secretAsEnv: - - keyToEnv: - - envVar: uri - secretKey: uri - - envVar: DB_PASSWORD - secretKey: password - - envVar: DB_HOST - secretKey: host - - envVar: DB_NAME - secretKey: dbname - - envVar: DB_USER - secretKey: user - - envVar: DB_PORT - secretKey: port - secretName: cluster-sample-app - secretAsVolume: - - mountPath: /app/feature_repo/secrets - optional: false - secretName: feast-feast-rec-sys-registry-tls - exec-train-model: - secretAsEnv: - - keyToEnv: - - envVar: MINIO_HOST - secretKey: host - - envVar: MINIO_PORT - secretKey: port - - envVar: MINIO_ACCESS_KEY - secretKey: accesskey - - envVar: MINIO_SECRET_KEY - secretKey: secretkey - - envVar: MINIO_SECURE - secretKey: secure - secretName: ds-pipeline-s3-dspa - - keyToEnv: - - envVar: uri - secretKey: uri - secretName: cluster-sample-app From 71c3b99d1ae867ae4e2bd72d503ea2fab62cdbd7 Mon Sep 17 00:00:00 2001 From: Hadar Cohen Date: Wed, 30 Jul 2025 14:42:03 +0300 Subject: [PATCH 02/10] add new container file and explanations why it's needed --- oc-tools/Containerfile | 12 ++++++++ oc-tools/README.md | 63 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 oc-tools/Containerfile create mode 100644 oc-tools/README.md diff --git a/oc-tools/Containerfile b/oc-tools/Containerfile new file mode 100644 index 0000000..2562dda --- /dev/null +++ b/oc-tools/Containerfile @@ -0,0 +1,12 @@ +FROM python:3.11-slim +WORKDIR /app +# Install curl and tar for downloading oc, then install oc CLI +RUN apt-get update && \ + apt-get install -y curl tar jq && \ + curl -L https://mirror.openshift.com/pub/openshift-v4/clients/ocp/latest/openshift-client-linux.tar.gz | tar -xz -C /usr/local/bin oc && \ + chmod +x /usr/local/bin/oc && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* +# Install Python dependencies +RUN pip install --upgrade pip && \ + pip install --no-cache-dir model_registry==0.2.21 diff --git a/oc-tools/README.md b/oc-tools/README.md new file mode 100644 index 0000000..dd11dd3 --- /dev/null +++ b/oc-tools/README.md @@ -0,0 +1,63 @@ +# OpenShift CLI Tools Container + +This container image provides OpenShift CLI tools and utilities needed for cluster credential management in the recommendation system workflow. + +## Purpose + +This specialized image is used by the `fetch_cluster_credentials` component in the Kubeflow pipeline to: +- Authenticate with the OpenShift cluster +- Retrieve user tokens and cluster information +- Get Model Registry service endpoints +- Extract routing information for external services + +## Contents + +### Tools Included +- **OpenShift CLI (`oc`)** - Latest version from OpenShift mirror +- **jq** - JSON processor for parsing API responses +- **curl** - For downloading the OpenShift CLI +- **tar** - For extracting downloaded archives + +### Python Base +- **Python 3.11-slim** - Minimal Python runtime +- **model_registry** - Python package for model registry operations (installed via pip at runtime) + +## Usage in Pipeline + +This image is used by the `fetch_cluster_credentials()` function in `train-workflow.py` + +## Building the Image + +```bash +# From the oc-tools/ directory +podman build --platform linux/amd64 -t quay.io/ecosystem-appeng/model-registry . +# Push to registry +podman push quay.io/rh-ee-ofridman/model-registry-python-oc +``` + +## Why Separate from Base Image? + +This image is kept separate from the main `BASE_IMAGE` because: + +1. **Security Isolation** - OpenShift CLI tools have cluster access privileges +2. **Image Size** - ML workloads don't need cluster management tools +3. **Separation of Concerns** - Infrastructure operations vs. ML operations +4. **Maintenance** - Can update OC tools independently of ML dependencies + +## Environment Variables + +The component using this image expects these environment variables: +- `MODEL_REGISTRY_NAMESPACE` - Namespace where model registry is deployed +- `MODEL_REGISTRY_CONTAINER` - Name of the model registry service + +## Dependencies + +This image requires the pod to have: +- ServiceAccount with cluster read permissions +- Access to the OpenShift API server +- Network connectivity to model registry services + +## Related Components + +- `registry_model_to_model_registry()` - Consumes the credentials from this component +- `train_model()` - Provides model artifacts to be registered \ No newline at end of file From 736bae6de2b968eeadbe7c4565cb9ce8c03e6703 Mon Sep 17 00:00:00 2001 From: Hadar Cohen Date: Wed, 30 Jul 2025 14:42:22 +0300 Subject: [PATCH 03/10] change images --- train-workflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/train-workflow.py b/train-workflow.py index 31ef7ff..62085a3 100644 --- a/train-workflow.py +++ b/train-workflow.py @@ -5,7 +5,7 @@ from kfp.dsl import Artifact, Dataset, Input, Model, Output BASE_IMAGE = os.getenv( - "BASE_REC_SYS_IMAGE", "quay.io/ecosystem-appeng/rec-sys-app:0.0.43" + "BASE_REC_SYS_IMAGE", "quay.io/ecosystem-appeng/rec-sys-app:latest" ) @@ -312,7 +312,7 @@ def table_exists(engine, table_name): return modelMetadata(bucket_name, new_version, object_name, torch.__version__[0:5]) -@dsl.component(base_image="quay.io/rh-ee-ofridman/model-registry-python-oc") +@dsl.component(base_image="quay.io/ecosystem-appeng/model-registry:latest") def fetch_cluster_credentials() -> ( NamedTuple("ocContext", [("author", str), ("user_token", str), ("host", str)]) ): From f784dcc295560b59839873bf46ea6c17142a7542 Mon Sep 17 00:00:00 2001 From: Hadar Cohen Date: Wed, 30 Jul 2025 14:50:23 +0300 Subject: [PATCH 04/10] use the commit as a tag and push to quay.io/ecosystem-appeng/rec-sys-workflow --- .github/workflows/build-and-push.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-and-push.yml b/.github/workflows/build-and-push.yml index d9624c2..0437a06 100644 --- a/.github/workflows/build-and-push.yml +++ b/.github/workflows/build-and-push.yml @@ -34,7 +34,7 @@ jobs: - name: Set version from run number id: version run: | - echo "tag=v1.0.${GITHUB_RUN_NUMBER}" >> $GITHUB_OUTPUT + echo "tag=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT - name: Build and push ${{ matrix.name }} uses: docker/build-push-action@v5 @@ -42,6 +42,6 @@ jobs: context: ${{ matrix.context }} file: Containerfile push: true - tags: quay.io/rh-ai-kickstart/${{ matrix.image-name }}:${{ steps.version.outputs.tag }} + tags: quay.io/ecosystem-appeng/${{ matrix.image-name }}:${{ steps.version.outputs.tag }} build-args: | IMAGE_TAG=${{ steps.version.outputs.tag }} From ab538f28779477f17f678dd29647ccd6bf0e210b Mon Sep 17 00:00:00 2001 From: Hadar Cohen Date: Wed, 30 Jul 2025 14:51:11 +0300 Subject: [PATCH 05/10] reformat the code with ruff --- train-workflow.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/train-workflow.py b/train-workflow.py index 62085a3..2fb3755 100644 --- a/train-workflow.py +++ b/train-workflow.py @@ -250,9 +250,7 @@ def table_exists(engine, table_name): ) new_version = "1.0.0" connection.execute( - text( - f"INSERT INTO model_version (version) VALUES " f"('{new_version}');" - ) + text(f"INSERT INTO model_version (version) VALUES ('{new_version}');") ) connection.commit() else: @@ -313,8 +311,8 @@ def table_exists(engine, table_name): @dsl.component(base_image="quay.io/ecosystem-appeng/model-registry:latest") -def fetch_cluster_credentials() -> ( - NamedTuple("ocContext", [("author", str), ("user_token", str), ("host", str)]) +def fetch_cluster_credentials() -> NamedTuple( + "ocContext", [("author", str), ("user_token", str), ("host", str)] ): import os import subprocess @@ -372,7 +370,7 @@ def registry_model_to_model_registry( path=object_name, region=os.environ.get("REGION", "us-east-1"), ), - version=(f"{new_version}_" f"{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}"), + version=(f"{new_version}_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}"), model_format_name="pytorch", model_format_version=torch_version, storage_key="minio", @@ -499,15 +497,11 @@ def mount_secret_feast_repository(task): ) dataset_url = os.getenv("DATASET_URL") if dataset_url is not None: - task.set_env_variable( - name="DATASET_URL", - value=dataset_url - ) + task.set_env_variable(name="DATASET_URL", value=dataset_url) @dsl.pipeline(name=os.path.basename(__file__).replace(".py", "")) def batch_recommendation(): - load_data_task = load_data_from_feast() mount_secret_feast_repository(load_data_task) # Component configurations From 68c94f7f18fc6330262f8dcfe37a4473196da862 Mon Sep 17 00:00:00 2001 From: Hadar Cohen <81298804+Hadar301@users.noreply.github.com> Date: Wed, 30 Jul 2025 15:19:06 +0300 Subject: [PATCH 06/10] Update README.md --- oc-tools/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/oc-tools/README.md b/oc-tools/README.md index dd11dd3..289a5b3 100644 --- a/oc-tools/README.md +++ b/oc-tools/README.md @@ -32,7 +32,7 @@ This image is used by the `fetch_cluster_credentials()` function in `train-workf # From the oc-tools/ directory podman build --platform linux/amd64 -t quay.io/ecosystem-appeng/model-registry . # Push to registry -podman push quay.io/rh-ee-ofridman/model-registry-python-oc +podman push quay.io/ecosystem-appeng/model-registry:latest ``` ## Why Separate from Base Image? @@ -60,4 +60,4 @@ This image requires the pod to have: ## Related Components - `registry_model_to_model_registry()` - Consumes the credentials from this component -- `train_model()` - Provides model artifacts to be registered \ No newline at end of file +- `train_model()` - Provides model artifacts to be registered From 85957c4a76676d580705c8b98acbea0603243b11 Mon Sep 17 00:00:00 2001 From: Hadar Cohen <81298804+Hadar301@users.noreply.github.com> Date: Wed, 30 Jul 2025 15:27:22 +0300 Subject: [PATCH 07/10] Update build-and-push.yml --- .github/workflows/build-and-push.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-and-push.yml b/.github/workflows/build-and-push.yml index 0437a06..4bdb7a0 100644 --- a/.github/workflows/build-and-push.yml +++ b/.github/workflows/build-and-push.yml @@ -42,6 +42,6 @@ jobs: context: ${{ matrix.context }} file: Containerfile push: true - tags: quay.io/ecosystem-appeng/${{ matrix.image-name }}:${{ steps.version.outputs.tag }} + tags: quay.io/rh-ai-kickstart/${{ matrix.image-name }}:${{ steps.version.outputs.tag }} build-args: | IMAGE_TAG=${{ steps.version.outputs.tag }} From ed2ec82b3507e9c45179b26634179a8c5da4c440 Mon Sep 17 00:00:00 2001 From: Hadar Cohen <81298804+Hadar301@users.noreply.github.com> Date: Wed, 30 Jul 2025 17:09:35 +0300 Subject: [PATCH 08/10] Update build-and-push.yml --- .github/workflows/build-and-push.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-and-push.yml b/.github/workflows/build-and-push.yml index 4bdb7a0..7e0791d 100644 --- a/.github/workflows/build-and-push.yml +++ b/.github/workflows/build-and-push.yml @@ -42,6 +42,8 @@ jobs: context: ${{ matrix.context }} file: Containerfile push: true - tags: quay.io/rh-ai-kickstart/${{ matrix.image-name }}:${{ steps.version.outputs.tag }} + tags: | + quay.io/rh-ai-kickstart/${{ matrix.image-name }}:${{ steps.version.outputs.tag }} + quay.io/rh-ai-kickstart/${{ matrix.image-name }}:latest build-args: | IMAGE_TAG=${{ steps.version.outputs.tag }} From dd43af20a26f2aa32588aeaebb2a09b42d5c5a24 Mon Sep 17 00:00:00 2001 From: Hadar Cohen Date: Thu, 31 Jul 2025 11:03:58 +0300 Subject: [PATCH 09/10] update build and push and create a new ci for the new image under oc-tools. --- .../build-and-push-model-registry.yaml | 49 +++++++++++++++++++ .github/workflows/build-and-push.yml | 4 +- 2 files changed, 51 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/build-and-push-model-registry.yaml diff --git a/.github/workflows/build-and-push-model-registry.yaml b/.github/workflows/build-and-push-model-registry.yaml new file mode 100644 index 0000000..fde8cc9 --- /dev/null +++ b/.github/workflows/build-and-push-model-registry.yaml @@ -0,0 +1,49 @@ +name: Build and push - model registry image + +on: + workflow_dispatch: + +env: + PROJECT_DIR: "." + +jobs: + build-and-push-image: + name: Build and Push container image + runs-on: ubuntu-latest + strategy: + matrix: + include: + - name: rec-sys-model-registry + context: oc-tools + image-name: rec-sys-model-registry + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to Quay.io + uses: docker/login-action@v3 + with: + registry: quay.io + username: ${{ secrets.QUAY_USERNAME }} + password: ${{ secrets.QUAY_PASSWORD }} + + - name: Set version from last commit hash + id: version + run: | + echo "tag=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT + + - name: Build and push ${{ matrix.name }} + uses: docker/build-push-action@v5 + with: + context: ${{ matrix.context }} + file: Containerfile + push: true + tags: | + quay.io/rh-ai-kickstart/${{ matrix.image-name }}:${{ steps.version.outputs.tag }} + quay.io/rh-ai-kickstart/${{ matrix.image-name }}:latest + build-args: | + IMAGE_TAG=${{ steps.version.outputs.tag }} diff --git a/.github/workflows/build-and-push.yml b/.github/workflows/build-and-push.yml index 7e0791d..b326a5c 100644 --- a/.github/workflows/build-and-push.yml +++ b/.github/workflows/build-and-push.yml @@ -1,4 +1,4 @@ -name: Build and push image +name: Build and push - pipeline image on: workflow_dispatch: @@ -31,7 +31,7 @@ jobs: username: ${{ secrets.QUAY_USERNAME }} password: ${{ secrets.QUAY_PASSWORD }} - - name: Set version from run number + - name: Set version from last commit hash id: version run: | echo "tag=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT From b078ed322fe87a7e77f56e49a4f80156251c63ef Mon Sep 17 00:00:00 2001 From: Hadar Cohen Date: Thu, 31 Jul 2025 11:04:32 +0300 Subject: [PATCH 10/10] work with rh-ai-kickstart organization on quay.io --- train-workflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/train-workflow.py b/train-workflow.py index 2fb3755..1f1210e 100644 --- a/train-workflow.py +++ b/train-workflow.py @@ -5,7 +5,7 @@ from kfp.dsl import Artifact, Dataset, Input, Model, Output BASE_IMAGE = os.getenv( - "BASE_REC_SYS_IMAGE", "quay.io/ecosystem-appeng/rec-sys-app:latest" + "BASE_REC_SYS_IMAGE", "quay.io/rh-ai-kickstart/rec-sys-app:latest" ) @@ -310,7 +310,7 @@ def table_exists(engine, table_name): return modelMetadata(bucket_name, new_version, object_name, torch.__version__[0:5]) -@dsl.component(base_image="quay.io/ecosystem-appeng/model-registry:latest") +@dsl.component(base_image="quay.io/rh-ai-kickstart/rec-sys-model-registry:latest") def fetch_cluster_credentials() -> NamedTuple( "ocContext", [("author", str), ("user_token", str), ("host", str)] ):