Click here to Skip to main content
15,867,568 members
Articles / Artificial Intelligence / Tensorflow
Article

Model Auto-Adjustment in an MLOps Pipeline

Rate me:
Please Sign up or sign in to vote.
4.75/5 (3 votes)
5 May 2021CPOL2 min read 5.4K   29   2  
In this article, we’ll implement automatic training.
Here we discuss the model creation, auto-adjustment and notifications of our CI/CD MLOps Pipeline.

In this series of articles, we’ll walk you through the process of applying CI/CD to the AI tasks. You’ll end up with a functional pipeline that meets the requirements of level 2 in the Google MLOps Maturity Model. We’re assuming that you have some familiarity with Python, Deep Learning, Docker, DevOps, and Flask.

In the previous article, we set up a cloud environment for this project. In this one, we’ll walk you through the code required for continuous integration, model auto-training, auto-tweaking, and continuous delivery. The diagram below shows you where we are in our project process.

Image 1

We’ll show a condensed version of the code. For the full version, see this repository. We’ll use GCR Docker images for this project (TensorFlow-powered) – but feel free to use alternative ones.

First, we’ll discuss the code that runs these solutions locally. Later, we’ll see how to get ready for the cloud deployment.

The diagram below shows the files structure for our project.

Image 2

data_utils.py

The data_utils.py file handles data loading, transformation, and model saving to GCS. This file may vary from project to project. In essence, it performs all the data processing tasks before model training. Let’s have a look at the code:

Python
import datetime
from google.cloud import storage
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
import gc
from sklearn import preprocessing
import os
import zipfile
import cv2
import sys
 
def dataset_transformation(path):
 images = []
 for dirname, _, filenames in os.walk(path):
  for filename in filenames:
   if filename.endswith('.png'):
    image = cv2.imread(os.path.join(dirname, filename))
    image = cv2.resize(image, (128, 128))
    images.append(image)
 return images
 
def load_data(args):    
 file_1 = '/root/AutomaticTraining-Dataset/COVID_RX/normal_images.zip'
 file_2 = '/root/AutomaticTraining-Dataset/COVID_RX/covid_images.zip'
 file_3 = '/root/AutomaticTraining-Dataset/COVID_RX/viral_images.zip'
 extract_to = '/root/AutomaticTraining-Dataset/COVID_RX/'
 
 with zipfile.ZipFile(file_1, 'r') as zip_ref:
  zip_ref.extractall(extract_to)
    
 with zipfile.ZipFile(file_2, 'r') as zip_ref:
  zip_ref.extractall(extract_to)
 
 with zipfile.ZipFile(file_3, 'r') as zip_ref:
  zip_ref.extractall(extract_to)

 normal = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/normal_images')
 covid = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/covid_images')
 viral = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/viral_images')
 #Train and test - dataset combination
 X = normal + viral + covid
 #Transforming from list to numpy array.
 X = np.array(X)
 
 #Creating labels.
 y = []
 for i in range(len(normal)):
  y.append(0)
 for i in range(len(covid)):
  y.append(1)
 for i in range(len(viral)):
  y.append(2)
 y = np.array(y)

 #Dataset splitting
 X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0, shuffle = True)
 return X_train, X_test, y_train, y_test
 
def save_model(bucket_name, best_model):
 try:
  storage_client = storage.Client() #if running on GCP
  bucket = storage_client.bucket(bucket_name)
  blob1 = bucket.blob('{}/{}'.format('testing',best_model))
  blob1.upload_from_filename(best_model)
  return True,None
 except Exception as e:
  return False,e 

model_assembly.py

The model_assembly.py file contains the code for model creation and auto-tweaking. We’re looking to start with a very basic model – train it and evaluate it. If the initial model doesn’t reach the desired performance, we’ll introduce refinements until we reach our goal. Let’s take a look at the code:

Python
from tensorflow.keras.models import load_model
from tensorflow.keras import layers
import tensorflow as tf
import numpy as np
 
def get_base_model():
 input_img = layers.Input(shape=(128, 128, 3))
 x = layers.Conv2D(64,(3, 3), activation='relu')(input_img)
 return input_img,x

def get_additional_layer(filters,x):
 x = layers.MaxPooling2D((2, 2))(x)
 x = layers.Conv2D(filters, (3, 3), activation='relu')(x)
 return x
 
def get_final_layers(neurons,x):
 x = layers.SpatialDropout2D(0.2)(x)
 x = layers.Flatten()(x)
 x = layers.Dense(neurons)(x)
 x = layers.Dense(3)(x)
 return x

These functions will be called in a loop and in the first iteration, we’ll get the base_model, the final_layers and stack them to build a very simple model. If after training we find the model doesn't perform well enough, then we’ll again get the base_model, add additional_layers, stack the final_layers, then train and evaluate it once more. If we’re still unable to reach a good performance, then the last process will be repeated in the loop adding more additional_layers until we reach a predefined good metric.

email_notifications.py

The email_notifications.py file is in charge of delivering emails to the product owner via a local SMTP server. These emails tell the owner if everything is well and, if not, what’s wrong.

Python
import smtplib
import os
 
# Email variables definition
sender = ‘example@gmail.com’
receiver = ['svirahonda@gmail.com'] #replace this by the owner's email address
smtp_provider = 'smtp.gmail.com' #replace this by your STMP provider
smtp_port = 587
smtp_account = ‘example@gmail.com’
smtp_password = ‘your_password’
 
def training_result(result,model_acc):
 if result == 'ok':
  message = 'The model reached '+str(model_acc)+', It has been saved to GCS.'
 if result == 'failed':
  message = 'None of the models reached an acceptable accuracy, training execution had to be forcefully ended.’
 message = 'Subject: {}\n\n{}'.format('An automatic training job has ended recently', message)
 try:
  server = smtplib.SMTP(smtp_provider,smtp_port)
  server.starttls()
  server.login(smtp_account,smtp_password)
  server.sendmail(sender, receiver, message)         
  return
 except Exception as e:
  print('Something went wrong. Unable to send email: '+str(e),flush=True)
  return
 
def exception(e_message):
 try:
  message = 'Subject: {}\n\n{}'.format('An automatic training job has failed.', e_message)
  server = smtplib.SMTP(smtp_provider,smtp_port)
  server.starttls()
  server.login(smtp_account,smtp_password)
  server.sendmail(sender, receiver, message)         
  return
 except Exception as e:
  print('Something went wrong. Unable to send email: '+str(e),flush=True)
  return

task.py

The task.py file orchestrates the program. It initializes GPUs – if any are available – starts model training, and tweaks the model if required. It also receives the arguments passed to the app. Here is the code:

Python
import tensorflow as tf
from tensorflow.keras import Model, layers, optimizers
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import Model
from tensorflow.keras.models import load_model
import argparse
import model_assembly, data_utils, email_notifications
import sys
import os
import gc
from google.cloud import storage
import datetime
import math
 
# general variables declaration
model_name = 'best_model.hdf5'
 
def initialize_gpu():
 if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
  tf.config.set_soft_device_placement(True)
  tf.debugging.set_log_device_placement(True)
  return
 
def start_training(args):
 # Loading splitted data
 X_train, X_test, y_train, y_test = data_utils.load_data(args)
 # Initializing GPU if available
 initialize_gpu()
 train_model(X_train, X_test, y_train, y_test, args)
 
def train_model(X_train, X_test, y_train, y_test,args):
 try:
  model_loss, model_acc = [0,0]
  counter = 0
  while model_acc <= 0.85:
   input_img,x = model_assembly.get_base_model()
   if counter == 0:
    x = model_assembly.get_final_layers(64,x)
   else:
    for i in range(counter):
     x = model_assembly.get_additional_layer(int(64*(math.pow(2,counter))),x)
    x = model_assembly.get_final_layers(int(64*(math.pow(2,counter))),x)
   cnn = Model(input_img, x,name="CNN_COVID_"+str(counter))
   cnn.summary()
   cnn.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
   checkpoint = ModelCheckpoint(model_name, monitor='val_loss', verbose=1,save_best_only=True, mode='auto', save_freq="epoch")
   cnn.fit(X_train, y_train, epochs=args.epochs, validation_data=(X_test, y_test),callbacks=[checkpoint])
   cnn = load_model(model_name)
   model_loss, model_acc = cnn.evaluate(X_test, y_test,verbose=2)
   if model_acc > 0.85:
    saved_ok = data_utils.save_model(args.bucket_name,model_name)
     if saved_ok[0] == True:
      email_notifications.training_result('ok',model_acc)
      sys.exit(0)
     else:
       email_notifications.exception(saved_ok[1])
       sys.exit(1)
   else:
    pass
   if counter >= 5:
    email_notifications.training_result('failed',None)
    sys.exit(1)
   counter += 1
 except Exception as e:
  email_notifications.exception('An exception when training the model has occurred: '+str(e))
  sys.exit(1)
 
def get_args():
 parser = argparse.ArgumentParser()
 parser.add_argument('--bucket-name', type=str, default = 'automatictrainingcicd-aiplatform', help = 'GCP bucket name')
 parser.add_argument('--epochs', type=int, default=3, help='Epochs number')
 args = parser.parse_args()
 return args
 
def main():
 args = get_args()
 start_training(args)
 
if __name__ == '__main__':
 main()

Dockerfile

Our Dockerfile is in charge of passing the directions to the Docker daemon to build the proper container. This is what it looks like:

Python
FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0
 
WORKDIR /root
 
RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python
RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev
 
ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache
RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-Dataset.git
ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache
RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-CodeCommit.git
 
RUN mv /root/AutomaticTraining-CodeCommit/model_assembly.py /root
RUN mv /root/AutomaticTraining-CodeCommit/task.py /root
RUN mv /root/AutomaticTraining-CodeCommit/data_utils.py /root
RUN mv /root/AutomaticTraining-CodeCommit/email_notifications.py /root
 
ENTRYPOINT ["python","task.py"]

The above file uses the gcr.io/deeplearning-platform-release/tf2-cpu.2-0 image, installs the dependencies, clones the required repositories, moves the files to the main directory, and sets the entry point for the container execution.

Next Steps

In the next article of the series, we’ll deep-dive into the Continuous Training code. Stay tuned!

This article is part of the series 'Automatic Training, Testing, and Deployment of AI using CI/CD View All

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
United States United States
Sergio Virahonda grew up in Venezuela where obtained a bachelor's degree in Telecommunications Engineering. He moved abroad 4 years ago and since then has been focused on building meaningful data science career. He's currently living in Argentina writing code as a freelance developer.

Comments and Discussions

 
-- There are no messages in this forum --