Python x Snowflake Multithreaded Data Upload: Working with SSO and Splitting Large Zip Files

By: Jess Brown, RXA Data Engineer

The information available to measure marketing effectiveness and measure the impact of short- and long-term effect of marketing investment has greatly increased the size of datasets for statistical analysis and complicated connections between platforms. As a leader in Growth Marketing Technology, RXA (RXA.IO) continues to develop solutions delivering on the expectation of performance optimization, accountability, and demonstrable financial benefits through; SaaS or custom, data driven and channel agnostic Media Mix Modeling (MMM) analysis.

Solution for uploading large datasets from Snowflake to Python

Python allows for fast processing of large datasets. Snowflake is great for long term data storage and access. But getting 2.1GB compressed (41GB uncompressed) .zip files from a cloud drive or local machine into Snowflake can be tricky for more than one reason; Snowflake does not natively handle the upload of .zip files, the recommended maximum file upload size for the Python/Snowflake connector is 100MB when using parallel processing, and there may be memory issues with trying to open the entire file locally (or on a server) to chunk it.

The RXA Data Engineering team has developed a solution to handle the transfer of large .zip files to a Snowflake table using Single Sign-On (SSO) authentication:

Getting Started

First, make sure you are using Python version 3.6+. Then install and import all the necessary libraries. The credentials file is named snowflake_credentials.py, and the template can be found in the project Git folder.

import datetime as dt
import math
import os
import threading
import time
import zipfile as zf

import pandas as pd # pandas==1.0.1
import snowflake.connector # snowflake-connector-python==2.2.5

# Credential file: snowflake_credentials.py
from snowflake_credentials import creds

Next, define a SnowConnector class to connect to your Snowflake instance and access tables within your database. The full code can be seen on GitHub.

class SnowConnector():
def __init__(self, creds):
# set variables from your creds file
return

# define variable access functions

def snowflake_cursor(self):
e_counter = 0
conn = None

while e_counter < 5:
try:
conn = snowflake.connector.connect(
user=self.user,
authenticator=’externalbrowser’,
account=self.account,
warehouse=self.warehouse,
database=self.database,
schema=self.schema
)
break
except snowflake.connector.errors.DatabaseError as e:
print(e)
print(‘Connection to Snowflake refused, trying again…’)
e_counter += 1
time.sleep(5)
if not conn:
print(“””\n*****\n
Connection to Snowflake refused after 5 attempts.
Please check connection credentials and
connection to server/internet.
\n*****\n”””)
exit(1)
print(“Connected to Snowflake”)
return conn

# define table checking and creation/replacing functions

Note: the authenticator=’externalbrowser’ argument is necessary for SSO authentication and will open a tab in your default browser to confirm that you are logged in. The “user” value in the snowflake_credentials.py file must match the account that you are logged into in your browser.

Splitting Your File

A note before discussing the splitting of files: this can also be done through command line (on Linux/MacOS) or through third-party software. I elected to do it programmatically, as I was constrained by my available access permissions and software. If you find a faster way to chunk files from a .zip format within Python, please let me know!

Creating a class and function to split up the .zip file allows us to run multiple threads at the same time, reducing the file chunking time by 84%* over a non-threaded approach. Python’s internal threading.Thread class is overridden with our own variables:

*based on my own testing and profiling, with a 2.1GB compressed/41GB uncompressed csv file

class ZipToGzThread (threading.Thread):
def __init__(self, thread_id, skip, rows_to_read,
filename, header, dtypes,
staging_folder, zipname=None):
threading.Thread.__init__(self)
self.threadID = thread_id
self.skip = skip
self.rows_to_read = rows_to_read
self.filename = filename
self.header = header.columns
self.dtypes = dtypes
self.staging_folder = staging_folder
self.zipname = zipname

def run(self):
chunk_file(self.skip, self.rows_to_read, self.filename,
self.header, self.dtypes, self.threadID,
self.staging_folder, self.zipname)

This program can handle .zip, .csv, and .txt files. If there is no zipname, .csv or .txt formats are assumed, and the file is read and chunked appropriately. The split files are compressed into a gzip format, as Snowflake can natively decompress these files.

def chunk_file(skip, rows_to_read, filename,
header, dtypes, threadID,
staging_filepath, zipname):
if zipname:
temp = pd.read_csv(zipname, skiprows=skip,
nrows=rows_to_read, dtype=dtypes,
names=header, header=None, engine=’c’)
else:
temp = pd.read_csv(filename, skiprows=skip,
nrows=rows_to_read, dtype=dtypes,
names=header, header=None, engine=’c’)
print(f’Read {len(temp)} lines starting at row {skip}’)

temp.to_csv(f’{staging_filepath}/{filename}_{threadID}.gz’,
index=False, header=True,
compression=’gzip’, chunksize=1000)
print(f’Sent thread ID {threadID} to local staging folder as GZIP’)

With this method, we take advantage of Pandas’ read_csv function, specifically skip_rows and nrows, that allows us to read in distinct row chunks anywhere in the file, while keeping the rows_to_read value constant. This will make more sense further down in the walkthrough.

Initializing Your Transfer

Once the files are chunked in a local staging folder and ready to be transferred to Snowflake staging tables, we must create a list of SQL statements to send to each individual thread. Again, we will override the threading.Thread class with our own variables:

class SfExecutionThread (threading.Thread):
def __init__(self, thread_id, sql_query):
threading.Thread.__init__(self)
self.threadID = thread_id
self.sql_query = sql_query

def run(self):
print(‘Starting {0}: {1}’.format(self.threadID, self.sql_query))
execute_in_snowflake(self.sql_query)
print(‘Exiting {0}: {1}’.format(self.threadID, self.sql_query))


def execute_in_snowflake(sf_query):
# connect to snowflake
temp_snow = SnowConnector(creds)
conn = temp_snow.snowflake_cursor()

# increase timeout
conn.cursor().execute(“””ALTER SESSION SET
STATEMENT_TIMEOUT_IN_SECONDS = 86400"””)

conn.cursor().execute(sf_query)
conn.close()

Now that all our Snowflake connecting, file splitting, and SQL creating functions are defined, we can run the main program. First order is to create an instance of the SnowConnector class:

snow = SnowConnector(creds)
snow.print_conn_info()
filetype = snow.get_filetype()
table_name = snow.get_table()

One of the biggest issues the RXA team encountered with chunking a zip file using threading is figuring out how many iterations need to be completed to read in the whole file. With a 41GB file, reading the whole dataset into memory is not very efficient. We used this trick (found here) to count the rows in a .zip file without storing anything other than the counter value:

if filetype == ‘zip’:
zipname = snow.get_infile()
filename = snow.get_file_within_zip()

with zf.ZipFile(zipname) as folder:
with folder.open(filename) as f:
row_count = 0
for _ in f:
row_count += 1
header = pd.read_csv(zipname, nrows=1)
else:
with open(filename) as f:
row_count = 0
for _ in f:
row_count += 1
header = pd.read_csv(filename, nrows=1)

This can also be used for large .csv/.txt files. We also gather the header information from the zip files at this time for later use.

Threading Your Transfer

To see the number of “read file” threads we need to run, the following calculations are made:

# calculate number of iterations needed for file chunking (recommended to use 1 mill)
rows_to_read = snow.get_rows_to_read()
rows_div_mill = row_count / rows_to_read
iterations = math.ceil(rows_div_mill)
skip = 1

# set all types to str/obj for faster read
dtypes = {k: str for k in header.columns}
file_chunk_threads = []

The skip value is set to 1 because we do not want to read in the header every time — the column names are stored in our header variable from earlier.

Now we loop through the number of iterations, passing in gradually increasing skip values until all rows are handled. For each ZipToGzThread instance created, the skip variable is increased by whatever the rows_to_read value is set to in your snowflake_credentials.py file (the default is 1 million).

if filetype == ‘zip’:
for i in range(1, iterations+1):
file_chunk_threads.append(ZipToGzThread(i, skip, rows_to_read,
filename, header,
dtypes, staging_filepath,
zipname=zipname))
skip += rows_to_read
else:
for i in range(1, iterations+1):
file_chunk_threads.append(ZipToGzThread(i, skip, rows_to_read,
filename, header,
dtypes, staging_filepath,
zipname=None))
skip += rows_to_read

From here, we start each thread and join them so the program will not continue until each thread is finished running. This Stack Overflow post was helpful to me when trying to wrap my head around how the .join() method works.

for fc_thread in file_chunk_threads:
fc_thread.start()

for fc_thread in file_chunk_threads:
fc_thread.join()

The same list creation and thread starting process is done for all the staging files after they are in the local staging folder (previous step):

staging_files = []

for root, dirs, files in os.walk(f”{staging_filepath}/.”):
for f in files:
staging_files.append(f)

put_statements = []

for staging_file in staging_files:
put_statements.append(f’’’
PUT file://{staging_filepath}/{staging_file} @%{table_name}
SOURCE_COMPRESSION = GZIP
PARALLEL = 20
AUTO_COMPRESS = FALSE’’’)

put_threads = []
put_counter = 0

# create thread list
for statement in put_statements:
put_threads.append(SfExecutionThread(put_counter, statement))
put_counter += 1

# execute the threads
for thread in put_threads:
thread.start()

for thread in put_threads:
thread.join()

Moving to Staging

After all staging files are placed into Snowflake, we perform one COPY INTO statement to move the files from their Snowflake staging table to the destination table.

copy_into_sql = f”””COPY INTO {table_name}
FROM @%{table_name}
FILE_FORMAT =
(REPLACE_INVALID_CHARACTERS = TRUE
SKIP_HEADER = 1
FIELD_DELIMITER = ‘{field_delimiter}’
RECORD_DELIMITER = ‘{record_delimiter}’)
ON_ERROR = CONTINUE”””
snow.snowflake_cursor().cursor().execute(copy_into_sql)

*I will admit that here is one spot where this file upload script has a hole. Every so often there is a row with a special character that does not allow it to be uploaded into the destination table. I am currently working on creating more data cleaning processes, or at least an output to an error log so these dropped rows are trackable. Use this COPY INTO statement at your own discretion, and please let me know if you have already found a solution to this issue!

From here, all that is left to do is to clean up the staging folders, both locally and in Snowflake. The local folder deletion is optional and can be turned on or off in the snowflake_credentials.py file (the default is to delete all staging data).

# deleting all values from the Snowflake staging table
remove_staging_sql = f”REMOVE @%{table_name}”
snow.snowflake_cursor().cursor().execute(remove_staging_sql)

if snow.get_rm_staging_files():
# remove files from local staging folder
for root, dirs, files in os.walk(staging_filepath):
for f in files:
os.unlink(os.path.join(root, f))

if snow.get_rm_staging_folder():
# deleting local staging folder
os.rmdir(staging_filepath)

Your dataset should now be accessible in your selected Snowflake table. However, all data is different, and each specific data problem requires a unique solution. Feel free to use parts of this code, or the whole thing to make your file uploads to Snowflake a bit easier! Feel free to contact me with questions or comments at jess.brown@rxa.io.

Notes:

The threading approach was based on this interworks blog post.

I plan to add functionality that allows a user to sign into Snowflake using a basic Username/Password instead of SSO in the near future. Please check the project Git folder for any updates: https://github.com/rxa-io/pysnow

View the full Python x Snowflake Multithreaded Connector code on GitHub.

About the Author: Jess Brown is a Data Engineer at RXA, a Growth Marketing Technology company fueled by data science and applied artificial intelligence. Through our GMI (Growth Marketing Intelligence) platform and solutions; RXA helps attract, convert, retain, and grow the value of our client’s customers by isolating specific business problems and developing actionable data-driven AI solutions to achieve prescribed results. Our Media Mix Modeling and Multi-touch attribution solutions are agnostic to mediums and channels to provide clients with an unbiased view of performance and optimization recommendations. You can learn more about RXA solutions by visiting us at RXA.IO or learn@rxa.com. While RXA is agnostic to BI platforms, they have been recognized as Domo’s most influential partner for 2021 and have been named Domo’s Innovation partner of the year for the past two years.

RXA is a Growth Marketing Intelligence company fueled by data science and applied artificial intelligence.