A working example of how to use the Snowpipe REST API calls to load a file into a table. This is a small tutorial of how to connect to Snowflake and how to use Snowpipe to ingest files into Snowflake tables. So for the purpose of delivering this tutorial to our dear readers, we opened a free trial account with Snowflake. This lesson consists of three steps, first one loading data from your computer into the internal stage, second loading data manually from S3 Bucket into the external stage, and last step loading data into the external stage using AWS Lambda function.

Why we do that? Why are we not loading it automatically from S3 Buckets to Snowflake tables?

The reason behind this process is to understand better how S3 Buckets and Snowflake tables can work together in harmony. So you can enjoy more with less work, minimum effort maximum satisfaction. But before we start playing with Snowflake and S3 let’s check the requirements.

Requirements

  1. Access to S3 Buckets through AWS account.
  2. Access to Snowflake console through Snowflake account.
  3. Access to EC2 instances through AWS account.

First Step: Loading Data from an Internal Stage

In this section of the document you prepare the environment inside Snowflake to be ready to create tables, stages, and pipes, by following these statements you will be able to make it happen:

  • Define a connection with your user, account, and password inside the file ~/.snowsql/config and the name of the connection is zaid_connection
  • Run this command in your terminal
snowsql -c zaid_connection -o log_level=DEBUG  
  • Generating the public and private key following the section Configure Security (Per User) inside this tutorial.
  • Run the next command inside Snowflake’s worksheet which is a console of Snowflake: 
alter user almalikz set RSA_PUBLIC_KEY="your_public_key"  
  • Run the next command in the same worksheet and check the field RSA_PUBLIC_KEY_FP is not empty.
desc user almalikz 
  • Create a new user
 create user zaid_user password='your_password';  
  • Assign public key to the new user

alter user zaid_user set rsa_public_key='your_public_key';  

  • Create a role to contain the Snowpipe privileges
use role securityadmin;
create or replace role snowpipe_s3;
grant usage on warehouse compute_wh to role snowpipe_s3;
grant usage on database demo_db to role snowpipe_s3;
grant usage on schema demo_db.public to role snowpipe_s3;
grant insert, select on demo_db.public.TABLE_S3 to role snowpipe_s3;
grant read on stage demo_db.public.STAGE_S3 to role snowpipe_s3;
grant ownership on pipe demo_db.public.PIPE_S3 to role snowpipe_s3;
grant role snowpipe_s3 to user zaid_user;
alter user zaid_user set default_role = snowpipe_s3;
  • Run the following commands to create a table, create a stage, upload a file into the internal staging area of Snowflake, and create a pipe, all these statements for the sake of injecting data in Snowflake tables.
create or replace table TABLE_S3(c1 number, c2 string);
create or replace stage STAGE_S3;  
put file:///data/test_file.csv @STAGE_S3;  
create or replace pipe demo_db.public.PIPE_S3 as copy into TABLE_S3 from @STAGE_S3;

So we created this program simple_ingest_snowflake.py  to access Snowflake, using user, account, host, port, pipe_name, private key in pem format, check this parameters in the file below:

If you like photography and your good in taking photos you can sell your photos using this link if you don’t like money forget it, my friend.

ingest_manager = SimpleIngestManager(account='by16910',
user='almalikz',
private_key=private_key_pem, 
pipe=pipe_name,
scheme='https',
host='by16910.eu-west-1.snowflakecomputing.com',
port=443)

In the python program commented above, we used these parameters to build JWT. Let’s begin with an abstract definition.

A JSON Web Token (JWT) is a JSON object that is defined in RFC 7519 as a safe way to represent a set of information between two parties. The token is composed of a header, a payload, and a signature.

Mikey Stecky-Efantis

In Snowflake the generation of JWTs is pre-built into the python libraries that Snowflake API provides (and which are documented in Snowflake docs), so ideally we would simply write a simple script that uses these libraries to automatically take care of JWTs for us. through the python code of the wrapper simple-ingest-snowflake

After we run the program for a couple of seconds we succeed in ingest the file from the internal staging area to the table area of Snowflake, and you can check the next logs that everything went smoothly.

DEBUG:botocore.vendored.requests.packages.urllib3.connectionpool:"POST/v1/data/pipes/demo_db.public.PIPE_S3/insertFiles?requestId=b9f27491-6434-4c5f-93b6-c6831afae12d HTTP/1.1" 200 88 DEBUG:snowflake.ingest.simple_ingest_manager:Ingest response: {'requestId': 'b9f27491-6434-4c5f-93b6-c6831afae12d', 'responseCode': 'SUCCESS'}

If you check the log above you can see that first of all is a POST operation and second we have success in the response code, and this is positive feedback.

 'pipe': 'DEMO_DB.PUBLIC.PIPE_S3', 'completeResult': True, 'nextBeginMark': '1_0', 'files': [
 {'path': test_file.csv.gz',    'stageLocation': 'stages/b3767d50-e0f8-4bb2-a52a-6677122c8223/', 'fileSize': 64, 'timeReceived': '2019-05-14T13:48:30.725Z', 'lastInsertTime': '2019-05-14T13:49:00.579Z', 'rowsInserted': 4, 'rowsParsed': 4, 'errorsSeen': 0, 'errorLimit': 1, 'complete': True, 'status': 'LOADED'}  

However this second log is a GET operation invoked by an expression inside our code, for checking the file size, time received, errors seen, if its completed and status type which is in our case was loaded.

Second Step: Loading Data from S3 External Stage

In this section, we talk about on how to ingest the data using the new feature auto-ingest of Snowflake and notification events, we succeeded in streaming the data from S3 Buckets to Snowflake tables, but we did it manually, why manually? because our free account doesn’t have the option of auto-ingest activated this is the reason why we did it without notifications and auto-ingest. We accomplish this step by following the next tasks:

First Task: Setup your AWS account.

For this task, you must follow the good practices of AWS Security that take into consideration, the following rules:

  1. Delete the root password.
  2. Create an IAM user.
  3. Create a new group.
  4. Define a new IAM password policy. 

Second Task: Change the warehouse to support auto-resume mode

We have two types of users one of them is the administrator user almalikz and the other one is normal user zaid_user, so this task is only possible to activate using the administrator user:

alter warehouse compute_wh resume;

Third Task: Loading the data into Snowflake table

In this task, we used the normal user account which is zaid_user instead of the administrator user.

  • Create the stage
create or replace stage s3_stage
url= 's3://outputzaid/'
credentials = (AWS_KEY_ID = 'your_key' AWS_SECRET_KEY='your_secret');
  • Create the table
 create or replace table s3_table(c1 number, c2 string);  
  • Loading data into the table
 copy into s3_table
from s3://outputzaid/test_file_trusted.csv credentials=(AWS_KEY_ID = 'your_key' AWS_SECRET_KEY='your_secret')
file_format = (type = csv field_delimiter = ',' );

Third Step: Loading Data from S3 Stage using Lambda Function

In this section, we will guide you through the process of automating Snowpipe with AWS Lambda functions.

AWS Lambda is an event-driven, serverless computing platform provided by Amazon as a part of the Amazon Web Service. It runs when triggered by an event and executes code that has been loaded into the system. You can adapt the sample python code provided in this topic and create a Lambda function that calls the Snowpipe REST API to load data from your external stage S3. The function is deployed to your AWS account, where it is hosted. Events you define in Lambda (e.g. when files in your S3 bucket are updated) invoke the Lambda function and run the python code.

This section describes the steps necessary to configure a Lambda function to automatically load data in micro-batches continuously using Snowpipe. So let’s get started following the next steps:

First Task: Accessing your AWS Instance

  1. Access to your EC2 micro instance, and check information about the your-ssh-key-name.
  2. Create and run an instance of EC2 using the web wizard of AWS console.
  3. Download the ssh-key.
  4. Connect with the instance using ssh command option.
 ssh -i "zaid_key.pem" ubuntu@ec2-3-91-26-48.compute-1.amazonaws.com  

Second Task: Create the Lambda Execution Environment

In order to complete this guide, you need to first create a deployment package containing your code and its dependencies. An easy way to do it is grabbing a t2.micro instance and set up an execution environment that mimics the one the python Lambda will run in. This is the step you’ll have to do it in your EC2 micro instance, following the next commands:

sudo apt-get install gcc make zip awscli libffi-dev openssl libssl-dev zlib1g-dev 
wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz  
tar -xzvf Python-3.6.1.tgz 
cd Python-3.6.1 && ./configure && make  
sudo make install  
sudo pip3 install --upgrade pip  
/usr/local/bin/virtualenv ~/zaid_venv  
source zaid_venv/bin/activate  
pip install Pillow  
pip install boto3  
pip install requests  
pip install snowflake-ingest   

Third Task: Create the Python Deployment Package

  1. Edit the Lambda Handler Function
  2. Create the file lambda_function.py for a basic Lambda handler function that uses the Snowflake Connector for python. Adjust the connection variables, table name, and S3 bucket to match your environment.

  3. Create and Package the Python Virtual Environment
  4. The script package_lambda_function.sh contains the commands that will create and package up the python environment for your Lambda function. Be careful here because if you change the name of the file containing the Lambda handler, you will also have to modify this script.

  5. Copy the code to the AWS EC2 instance using
  6.  scp -i keys/zaid_key.pem to_tar.tar.gz ubuntu@ec2-35-168-21-113.compute-1.amazonaws.com:/home/ubuntu/to_tar.tar.gz

    After running package_lambda_function.sh, it will generate the file lambda_function.zip contains your python Lambda handler code/file as well as the Python dependencies needed to run in the AWS Lambda environment.

  7. Create Stage, table and pipe in snowflake
  8. create or replace stage s3_stage url= 's3://outputzaid/' credentials = (AWS_KEY_ID = 'your_key' AWS_SECRET_KEY='your_secret');  
    create or replace table s3_table(c1 number, c2 string);  
    create or replace pipe s3_pipe as copy into s3_table from @s3_stage file_format = (type = 'CSV');  
    create or replace pipe s3_pipe as copy into s3_table from @s3_stage file_format = (type = 'CSV');

 Fourth Task: Create and Configure the AWS Lambda Permissions

  1. Run aws configure and write your credentials and your default region
  2. In order for you to execute the Lambda, you need to use the aws command to create and set the appropriate permissions for the roles and policies and then upload the zip archive containing our python environment. These commands can be found in the file aws_lambda_setup.sh

Fifth Task: Test the Lambda Function using Test Files

At this point, you can then test fire the Lambda and see the output in the AWS Web Console UI. Navigate to Lambda > Functions > lambda_function.

You should now see the list-box on the upper right so you need to configure a test here, you choose a name and put the following statements inside the box of testing

{
  "Records": [
    {
      "eventVersion": "2.0",
      "eventSource": "aws:s3",
      "awsRegion": "us-east-1",
      "eventTime": "1970-01-01T00:00:00.000Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "AIDAJDPLRKLG7UEXAMPLE"
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "testConfigRule",
        "bucket": {
          "name": "outputzaid",
          "ownerIdentity": {
            "principalId": "A3NL1KOZZKExample"
          },
          "arn": "arn:aws:s3:::sourcebucket"
        },
        "object": {
          "key": "test_file_lambda.csv",
          "size": 1024,
          "eTag": "d41d8cd98f00b204e9800998ecf8427e",
          "versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko"
        }
      }
    }
  ]
}

CONCLUSION

You accomplish the mission my dear friend Well Done!
You have created a Lambda function to stream data from S3 Buckets to Snowflake tables this is a fantastic first step for you towards becoming a Data Engineer!
I have been creating quite a few tutorials to show you how to do streaming data. All of these start from the very basics. We will start to put more articles in the next weeks. So keep in touch with us, and discover an easy way to learn streaming data.

Cheers,
Zaid Alissa Almaliki