Update data to S3 with snappy compression in parquet format
What am I comressing
I am comressing a file which contains JSON data in each line. So, data looks like
{"key1": "some-value-1", "Key2": .......}
{"key1": "some-value-2", "Key2": .......}
{"key1": "some-value-3", "Key2": .......}
Why to I have a separate JSON in each line
In order for athena to read S3 data in JSON format, each JSON-encoded record should be represented on a separate line.
Ref - https://docs.aws.amazon.com/athena/latest/ug/parsing-JSON.html
This might not be relevant as we are finally uploading data to S3 as CSV. So, chose any other option if you want.
Compressing data
Process
- Create a CSV file
- Read data from file which contains json data and put in the CSV file
- Create a pandas dataframe from csv data
- Compress dataframe to snappy format
Create csv file from initial file with json data
Suppose our initial file is test_file
file_name = "test_file"
So we will
- Create a CSV file test_file.csv
- Read data from file which contains json data and put it in the CSV file
import pandas
import csv
import json
# Open a new CSV file
with open(file_name + '.csv', mode='w') as csv_file:
    # This represents column's which will be visible in athena
    # We should have json data with this key in our file
    fieldnames = [
        'key1',
        'key2'
    ]
    # Dictionary Writer
    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
    # Ref - https://www.kite.com/python/docs/csv.DictWriter
    # Write CSV Header
    writer.writeheader()
    # Ref - https://stackoverflow.com/questions/2982023/how-to-write-header-row-with-csv-dictwriter
    # Read data from initial file - which contains JSON in each line
    with open(file_name, 'r') as file:
        for line in file:
            # Write data to CSV file
            writer.writerow(json.loads(line))
Compress to snappy format
 # Create DataFrame from CSV File
df = pandas.read_csv(file_name + '.csv', header = 0)
# Convert data to Parquet format, with Snappy compression
# Here index represents partition
# It creates file "{file_name}.snappy" with parquet format
df.to_parquet(file_name + '.snappy', compression='snappy', index='s3-partition-key')
Upload data to S3
comressed_file = file_name + '.snappy'
s3_client.upload_file(
    Filename=comressed_file,
    Bucket=my_bucket,
    Key='s3-partition-key=' + date_or_something + '/' + compressed_file
)
Tip
We can provide additional parameters file uploading file like :
- Object ACL
- Additional object tagging
s3_client.upload_file(
    ExtraArgs={'ACL': 'bucket-owner-full-control', 'Tagging': f'submitted_by=noname'}
)