Skip to content

Update data to S3 with snappy compression in parquet format

What am I comressing

I am comressing a file which contains JSON data in each line. So, data looks like

{"key1": "some-value-1", "Key2": .......}
{"key1": "some-value-2", "Key2": .......}
{"key1": "some-value-3", "Key2": .......}

Why to I have a separate JSON in each line

In order for athena to read S3 data in JSON format, each JSON-encoded record should be represented on a separate line.

Ref - https://docs.aws.amazon.com/athena/latest/ug/parsing-JSON.html

This might not be relevant as we are finally uploading data to S3 as CSV. So, chose any other option if you want.

Compressing data

Process

  • Create a CSV file
  • Read data from file which contains json data and put in the CSV file
  • Create a pandas dataframe from csv data
  • Compress dataframe to snappy format

Create csv file from initial file with json data

Suppose our initial file is test_file

file_name = "test_file"

So we will

  • Create a CSV file test_file.csv
  • Read data from file which contains json data and put it in the CSV file
import pandas
import csv
import json

# Open a new CSV file
with open(file_name + '.csv', mode='w') as csv_file:

    # This represents column's which will be visible in athena
    # We should have json data with this key in our file

    fieldnames = [
        'key1',
        'key2'
    ]

    # Dictionary Writer
    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
    # Ref - https://www.kite.com/python/docs/csv.DictWriter

    # Write CSV Header
    writer.writeheader()
    # Ref - https://stackoverflow.com/questions/2982023/how-to-write-header-row-with-csv-dictwriter

    # Read data from initial file - which contains JSON in each line
    with open(file_name, 'r') as file:
        for line in file:
            # Write data to CSV file
            writer.writerow(json.loads(line))

Compress to snappy format

 # Create DataFrame from CSV File
df = pandas.read_csv(file_name + '.csv', header = 0)

# Convert data to Parquet format, with Snappy compression
# Here index represents partition
# It creates file "{file_name}.snappy" with parquet format
df.to_parquet(file_name + '.snappy', compression='snappy', index='s3-partition-key')

Upload data to S3

comressed_file = file_name + '.snappy'
s3_client.upload_file(
    Filename=comressed_file,
    Bucket=my_bucket,
    Key='s3-partition-key=' + date_or_something + '/' + compressed_file
)

Tip

We can provide additional parameters file uploading file like :

  • Object ACL
  • Additional object tagging
s3_client.upload_file(
    ExtraArgs={'ACL': 'bucket-owner-full-control', 'Tagging': f'submitted_by=noname'}
)
Back to top