Click here to Skip to main content
15,887,027 members
Articles / Hosted Services / Google Cloud

GCloud - Cloud Functions with Cloud Storage, PubSub and DataStore

Rate me:
Please Sign up or sign in to vote.
0.00/5 (No votes)
21 Jan 2019CPOL4 min read 5K   2  
A scenario where 4 Google Cloud services are put together for a solution, detailing the integration between these

This blog details a scenario where 4 Google Cloud services are put together for a solution, detailing the integration between these. Now before you start, this whole exercise can be done with a single loop in python or any programming language, but that is not the point. As a learning exercise, you will know:

  • How to trigger PubSub topic from storage
  • How to develop, deploy and trigger serverless function using PubSub
  • How to use Datastore in serverless environment
  • Using transactions in Datastore
  • Utilizing Storage client libraries in Google Functions

Scenario

BSE sensex (India's stock exchange) provides end of day data in a single zip file every day. This blog uses 4 Google services to download all the files from the start day (that user provides). So if I trigger it with 010118, it should download the data for whole year till today. If the data is 31-Oct, then from 1st Nov till today.

Solution

  1. The loop is triggered by placing first file in cloud storage, say EQ010118.csv
  2. Storage notifies pubsub, which in turns trigger the function to execute with message payload - 010118
  3. Function adds a day the message and downloads another file into Google storage
  4. This triggers step1 with payload 020118 and soon
  5. Datastore is used to store file metadata (in this case, this is only filename). Specific purpose is explained with the code below.

Deploy and Execute

Pre-requisite

  1. Create Google cloud account
  2. Install google cloud sdk. I am on ubuntu 16, python 2.7.
  3. Initialize your account - "gcloud init"

Create Project

gcloud projects create bhavcopy --set-as-default

Create and Set Service Account

Here 3rd command sets the shell to use our new project for resource deployments. Change the home directory name as per your environment.

gcloud iam service-accounts create bhavcopy-servact --display-name bhavcopy-servact
gcloud iam service-accounts keys create ~/gc-serv-accs/bhavcopy-servact.json 
                 --iam-account bhavcopy-servact@bhavcopy.iam.gserviceaccount.com
export GOOGLE_APPLICATION_CREDENTIALS=/home/kamal/gc-serv-accs/bhavcopy-servact.json
gcloud projects add-iam-policy-binding bhavcopy 
 --member serviceAccount:bhavcopy-servact@bhavcopy.iam.gserviceaccount.com --role roles/editor

Create Storage Bucket

gsutil mb gs://bhavcopy-store

Create pubsub Topic and Notification

The notification is from cloud storage to pubsub and is set only for object creation (FINALIZE).

gcloud pubsub topics create bhavcopy-messages<br />gsutil notification create  
-t bhavcopy-messages -f json -e OBJECT_FINALIZE gs://bhavcopy-store

Enable APIs

  1. Login to Google console and select your new project - bhavcopy
  2. Now go to cloud functions (under compute), if it is the first time, then you will be asked to enable the API.
  3. Go to Datastore, you will be asked to enable either Datastore or Firestore as datastore. Both are ok, I am using firestore as datastore.

Deploy Function

Download the code from my git repository and deploy it to cloud. You do not need to change anything if you have used same name of resources as above. Yes, it's hardcoded. The code has two files:

  • main.py - This is the file where gcloud function service looks into for function definition (for python runtime)
  • requirements.txt - This file provides list of dependent packages.

Here, we are defining "download_bhavcopy" as our main function call for the program.

git clone https://github.com/skamalj/gcloud-functions.git
cd gcloud-functions/bhavcopy-download
gcloud functions deploy download_bhavcopy 
   --trigger-topic bhavcopy-messages --source .  --runtime python37

Execute/Trigger the Function

We initiate the loop by sending first file to the storage bucket. Below trigger will download files from 2nd January onward.

touch EQ010118.csv
gsutil cp EQ010118.csv gs://bhavcopy-store

Re-run

To re-run the code, there is no need to delete the files from storage. But you will need to delete Entity "RecievedFiles" from datastore or delete all entries from it.

Code Explanation

The first function we look at is the "check_if_already_stored" function. This function solves a particular problem where pubsub sends multiple signals to functions for one trigger from storage. Pubsub is atleast once delivery, that means message can be send multiple times and your function will end up executing multiple times for the same date.

You can test this fact by removing this condition from the main program and enabling versioning on storage bucket.

By using transaction in line 4, the code ensures that only one process gets to create the metadata entry in database. Return value from this function tells the main program whether the process is duplicate or not.

Downside of this implementation is that if the process which got the lock crashes, there is no other process to do the job.

def check_if_already_stored(fname):
    already_downloaded = True
    client = datastore.Client(project='bhavcopy')
    with client.transaction():
        key = client.key('RecievedFiles', fname)
        result = client.get(key)
        if result is None:
            entity = datastore.Entity(key=key)
            entity['file_name'] = fname
            client.put(entity)
            already_downloaded = False
            logging.info('Created new entry for ' + fname)
        else:
            logging.info('Received duplicate message for ' + fname)
        return already_downloaded

The code also used datastore to store holiday list which is read at runtime to verify if or not data will be available from the website. This is not used in the current demo but you can fill the data in this table ex. 02/03/18, and the function will not download file for that date.

def create_holiday_dict():
    holiday_dict = {}
    client = datastore.Client(project='bhavcopy')
    query = client.query(kind='Trading-Holidays')
    for row in query.fetch():
        holiday_date_strf = row['Date'].strftime('%d%m%y')
        holiday_dict[holiday_date_strf] = holiday_date_strf
    return holiday_dict

The last piece of code to look at is below. One point to know is that cloud function can only download any file to /tmp mount. That is the only one available to functions.

Here again, we use storage client library to reference our project and bucket and store the file in there - lines 6,7,8.

file_downloaded_locally , new_fname = check_and_download(new_date,holiday_dict,base_url)
    try:
        if file_downloaded_locally and (not check_if_already_stored(new_fname)):  
            client = storage.Client(project='bhavcopy')
            bucket = client.get_bucket('bhavcopy-store')
            blob = Blob(new_fname, bucket)
            with open('/tmp/'+new_fname, 'rb') as my_file:
              blob.upload_from_file(my_file) 
    except Exception as e:
        logging.info('Not Downloaded: 
              Cloud function exiting without storing file for date: '+ str(new_date) +
                    '.Received error: ' + str(e))

Next, we will extend this example to load all the data, as it arrives, into datastore table with bulk load.

One of the issues you will hit if trying to download too many files is 409 too much contention on these datastore entities. Please try again. The solution for this is sharding our datastore entity into multiple parts. This is not implemented here, pick it up as an exercise - may be create 4 entities RecievedFiles[0-3] - each one for day % 4.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



Comments and Discussions

 
-- There are no messages in this forum --