buster/python/populate_warehouse.ipynb

233 lines
7.4 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 162,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"env: PYICEBERG_MAX_WORKERS=300\n"
]
}
],
"source": [
"%env PYICEBERG_MAX_WORKERS=300"
]
},
{
"cell_type": "code",
"execution_count": 163,
"metadata": {},
"outputs": [],
"source": [
"#!pip install \"pyiceberg[s3fs]\""
]
},
{
"cell_type": "code",
"execution_count": 164,
"metadata": {},
"outputs": [],
"source": [
"from pyiceberg.catalog.rest import RestCatalog"
]
},
{
"cell_type": "code",
"execution_count": 165,
"metadata": {},
"outputs": [],
"source": [
"catalog = RestCatalog(\"public\", ** {\n",
" \"uri\": f\"http://localhost:8181\",\n",
"})"
]
},
{
"cell_type": "code",
"execution_count": 166,
"metadata": {},
"outputs": [],
"source": [
"catalog.create_namespace(\"public\")"
]
},
{
"cell_type": "code",
"execution_count": 167,
"metadata": {},
"outputs": [],
"source": [
"from pyiceberg.schema import Schema\n",
"from pyiceberg.types import (\n",
" NestedField,\n",
" LongType,\n",
" TimestampType,\n",
" DoubleType,\n",
" StringType,\n",
")\n",
"\n",
"schema = Schema(\n",
" NestedField(field_id=1, name=\"VendorID\", field_type=LongType(), required=False),\n",
" NestedField(field_id=2, name=\"tpep_pickup_datetime\", field_type=TimestampType(), required=False),\n",
" NestedField(field_id=3, name=\"tpep_dropoff_datetime\", field_type=TimestampType(), required=False),\n",
" NestedField(field_id=4, name=\"passenger_count\", field_type=DoubleType(), required=False),\n",
" NestedField(field_id=5, name=\"trip_distance\", field_type=DoubleType(), required=False),\n",
" NestedField(field_id=6, name=\"RatecodeID\", field_type=DoubleType(), required=False),\n",
" NestedField(field_id=7, name=\"store_and_fwd_flag\", field_type=StringType(), required=False),\n",
" NestedField(field_id=8, name=\"PULocationID\", field_type=LongType(), required=False),\n",
" NestedField(field_id=9, name=\"DOLocationID\", field_type=LongType(), required=False),\n",
" NestedField(field_id=10, name=\"payment_type\", field_type=LongType(), required=False),\n",
" NestedField(field_id=11, name=\"fare_amount\", field_type=DoubleType(), required=False),\n",
" NestedField(field_id=12, name=\"extra\", field_type=DoubleType(), required=False),\n",
" NestedField(field_id=13, name=\"mta_tax\", field_type=DoubleType(), required=False),\n",
" NestedField(field_id=14, name=\"tip_amount\", field_type=DoubleType(), required=False),\n",
" NestedField(field_id=15, name=\"tolls_amount\", field_type=DoubleType(), required=False),\n",
" NestedField(field_id=16, name=\"improvement_surcharge\", field_type=DoubleType(), required=False),\n",
" NestedField(field_id=17, name=\"total_amount\", field_type=DoubleType(), required=False),\n",
" NestedField(field_id=18, name=\"congestion_surcharge\", field_type=DoubleType(), required=False),\n",
" NestedField(field_id=19, name=\"airport_fee\", field_type=DoubleType(), required=False),\n",
")\n"
]
},
{
"cell_type": "code",
"execution_count": 168,
"metadata": {},
"outputs": [],
"source": [
"from pyiceberg.partitioning import PartitionSpec, PartitionField\n",
"from pyiceberg.transforms import DayTransform, MonthTransform, YearTransform, BucketTransform\n",
"\n",
"partition_spec = PartitionSpec(\n",
" PartitionField(source_id=2, field_id=1001, transform=MonthTransform(), name=\"tpep_pickup_datetime_month\"),\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 169,
"metadata": {},
"outputs": [],
"source": [
"from pyiceberg.table.sorting import SortOrder, SortField\n",
"from pyiceberg.transforms import IdentityTransform\n",
"\n",
"sort_order = SortOrder(\n",
" SortField(source_id=4, transform=IdentityTransform())\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 170,
"metadata": {},
"outputs": [],
"source": [
"table = catalog.create_table(\n",
" identifier=\"public.nyc_taxi\",\n",
" schema=schema,\n",
" partition_spec=partition_spec,\n",
" sort_order=sort_order,\n",
" properties={\n",
" \"write.format.default\": \"parquet\",\n",
" \"write.parquet.compression-codec\": \"zstd\",\n",
" \"write.target-file-size-bytes\": \"536870912\",\n",
" \"s3.connect-timeout\": \"10000\"\n",
" }\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 171,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Appending files: 100%|██████████| 26/26 [10:19<00:00, 23.83s/it, Appended yellow_tripdata_2021-09.parquet]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Total rows in the table: 73531304\n"
]
}
],
"source": [
"import os\n",
"import requests\n",
"import io\n",
"import pyarrow.parquet as pq\n",
"from tqdm import tqdm\n",
"\n",
"# GitHub repository information\n",
"repo_owner = \"buster-so\"\n",
"repo_name = \"sample-data\"\n",
"folder_path = \"nyc_taxi\"\n",
"\n",
"# GitHub API endpoint to get repository contents\n",
"api_url = f\"https://api.github.com/repos/{repo_owner}/{repo_name}/contents/{folder_path}\"\n",
"\n",
"# Fetch the list of files in the repository\n",
"response = requests.get(api_url)\n",
"if response.status_code != 200:\n",
" raise Exception(f\"Failed to fetch repository contents: {response.status_code}\")\n",
"\n",
"files = [item for item in response.json() if item['name'].endswith('.parquet')]\n",
"\n",
"# Create a progress bar\n",
"with tqdm(total=len(files), desc=\"Appending files\") as pbar:\n",
" for file in files:\n",
" # Download the file content\n",
" file_url = file['download_url']\n",
" file_response = requests.get(file_url)\n",
" if file_response.status_code != 200:\n",
" print(f\"Failed to download {file['name']}: {file_response.status_code}\")\n",
" continue\n",
" \n",
" # Read the Parquet file from the response content\n",
" file_content = io.BytesIO(file_response.content)\n",
" df = pq.read_table(file_content)\n",
" \n",
" # Append to the Iceberg table\n",
" table.append(df)\n",
" \n",
" pbar.update(1)\n",
" pbar.set_postfix_str(f\"Appended {file['name']}\")\n",
"\n",
"# Print the total number of rows in the table after appending all files\n",
"print(f\"Total rows in the table: {len(table.scan().to_arrow())}\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}