Update data to S3 with snappy compression in parquet format
What am I comressing
I am comressing a file which contains JSON data in each line. So, data looks like
{"key1": "some-value-1", "Key2": .......}
{"key1": "some-value-2", "Key2": .......}
{"key1": "some-value-3", "Key2": .......}
Why to I have a separate JSON in each line
In order for athena to read S3 data in JSON format, each JSON-encoded record should be represented on a separate line.
Ref - https://docs.aws.amazon.com/athena/latest/ug/parsing-JSON.html
This might not be relevant as we are finally uploading data to S3 as CSV. So, chose any other option if you want.
Compressing data
Process
- Create a CSV file
- Read data from file which contains json data and put in the CSV file
- Create a pandas dataframe from csv data
- Compress dataframe to snappy format
Create csv file from initial file with json data
Suppose our initial file is test_file
file_name = "test_file"
So we will
- Create a CSV file
test_file.csv
- Read data from file which contains json data and put it in the CSV file
import pandas
import csv
import json
# Open a new CSV file
with open(file_name + '.csv', mode='w') as csv_file:
# This represents column's which will be visible in athena
# We should have json data with this key in our file
fieldnames = [
'key1',
'key2'
]
# Dictionary Writer
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
# Ref - https://www.kite.com/python/docs/csv.DictWriter
# Write CSV Header
writer.writeheader()
# Ref - https://stackoverflow.com/questions/2982023/how-to-write-header-row-with-csv-dictwriter
# Read data from initial file - which contains JSON in each line
with open(file_name, 'r') as file:
for line in file:
# Write data to CSV file
writer.writerow(json.loads(line))
Compress to snappy format
# Create DataFrame from CSV File
df = pandas.read_csv(file_name + '.csv', header = 0)
# Convert data to Parquet format, with Snappy compression
# Here index represents partition
# It creates file "{file_name}.snappy" with parquet format
df.to_parquet(file_name + '.snappy', compression='snappy', index='s3-partition-key')
Upload data to S3
comressed_file = file_name + '.snappy'
s3_client.upload_file(
Filename=comressed_file,
Bucket=my_bucket,
Key='s3-partition-key=' + date_or_something + '/' + compressed_file
)
Tip
We can provide additional parameters file uploading file like :
- Object ACL
- Additional object tagging
s3_client.upload_file(
ExtraArgs={'ACL': 'bucket-owner-full-control', 'Tagging': f'submitted_by=noname'}
)