mirror of https://github.com/buster-so/buster.git
233 lines
7.3 KiB
Plaintext
233 lines
7.3 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 8,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"env: PYICEBERG_MAX_WORKERS=300\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"%env PYICEBERG_MAX_WORKERS=300"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"#!pip install \"pyiceberg[s3fs]\""
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 9,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from pyiceberg.catalog.rest import RestCatalog"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 10,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"catalog = RestCatalog(\"public\", ** {\n",
|
|
" \"uri\": f\"http://localhost:8181\",\n",
|
|
"})"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 11,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"catalog.create_namespace(\"public\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 12,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from pyiceberg.schema import Schema\n",
|
|
"from pyiceberg.types import (\n",
|
|
" NestedField,\n",
|
|
" LongType,\n",
|
|
" TimestampType,\n",
|
|
" DoubleType,\n",
|
|
" StringType,\n",
|
|
")\n",
|
|
"\n",
|
|
"schema = Schema(\n",
|
|
" NestedField(field_id=1, name=\"VendorID\", field_type=LongType(), required=False),\n",
|
|
" NestedField(field_id=2, name=\"tpep_pickup_datetime\", field_type=TimestampType(), required=False),\n",
|
|
" NestedField(field_id=3, name=\"tpep_dropoff_datetime\", field_type=TimestampType(), required=False),\n",
|
|
" NestedField(field_id=4, name=\"passenger_count\", field_type=DoubleType(), required=False),\n",
|
|
" NestedField(field_id=5, name=\"trip_distance\", field_type=DoubleType(), required=False),\n",
|
|
" NestedField(field_id=6, name=\"RatecodeID\", field_type=DoubleType(), required=False),\n",
|
|
" NestedField(field_id=7, name=\"store_and_fwd_flag\", field_type=StringType(), required=False),\n",
|
|
" NestedField(field_id=8, name=\"PULocationID\", field_type=LongType(), required=False),\n",
|
|
" NestedField(field_id=9, name=\"DOLocationID\", field_type=LongType(), required=False),\n",
|
|
" NestedField(field_id=10, name=\"payment_type\", field_type=LongType(), required=False),\n",
|
|
" NestedField(field_id=11, name=\"fare_amount\", field_type=DoubleType(), required=False),\n",
|
|
" NestedField(field_id=12, name=\"extra\", field_type=DoubleType(), required=False),\n",
|
|
" NestedField(field_id=13, name=\"mta_tax\", field_type=DoubleType(), required=False),\n",
|
|
" NestedField(field_id=14, name=\"tip_amount\", field_type=DoubleType(), required=False),\n",
|
|
" NestedField(field_id=15, name=\"tolls_amount\", field_type=DoubleType(), required=False),\n",
|
|
" NestedField(field_id=16, name=\"improvement_surcharge\", field_type=DoubleType(), required=False),\n",
|
|
" NestedField(field_id=17, name=\"total_amount\", field_type=DoubleType(), required=False),\n",
|
|
" NestedField(field_id=18, name=\"congestion_surcharge\", field_type=DoubleType(), required=False),\n",
|
|
" NestedField(field_id=19, name=\"airport_fee\", field_type=DoubleType(), required=False),\n",
|
|
")\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 13,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from pyiceberg.partitioning import PartitionSpec, PartitionField\n",
|
|
"from pyiceberg.transforms import DayTransform, MonthTransform, YearTransform, BucketTransform\n",
|
|
"\n",
|
|
"partition_spec = PartitionSpec(\n",
|
|
" PartitionField(source_id=2, field_id=1001, transform=MonthTransform(), name=\"tpep_pickup_datetime_month\"),\n",
|
|
")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 14,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from pyiceberg.table.sorting import SortOrder, SortField\n",
|
|
"from pyiceberg.transforms import IdentityTransform\n",
|
|
"\n",
|
|
"sort_order = SortOrder(\n",
|
|
" SortField(source_id=4, transform=IdentityTransform())\n",
|
|
")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 15,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"table = catalog.create_table(\n",
|
|
" identifier=\"public.nyc_taxi\",\n",
|
|
" schema=schema,\n",
|
|
" partition_spec=partition_spec,\n",
|
|
" sort_order=sort_order,\n",
|
|
" properties={\n",
|
|
" \"write.format.default\": \"parquet\",\n",
|
|
" \"write.parquet.compression-codec\": \"zstd\",\n",
|
|
" \"write.target-file-size-bytes\": \"536870912\",\n",
|
|
" \"s3.connect-timeout\": \"10000\"\n",
|
|
" }\n",
|
|
")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 16,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Appending files: 100%|██████████| 26/26 [11:30<00:00, 26.56s/it, Appended 2022-12]\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Total rows in the table: 73531304\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"import requests\n",
|
|
"import io\n",
|
|
"import pyarrow.parquet as pq\n",
|
|
"from tqdm import tqdm\n",
|
|
"from datetime import datetime, timedelta\n",
|
|
"\n",
|
|
"# Base URL for the Parquet files\n",
|
|
"base_url = \"https://pub-f6a668561f5e4bd6ac651efd8c18998d.r2.dev/nyc_taxi/yellow_tripdata_{}.parquet\"\n",
|
|
"\n",
|
|
"# Generate a list of dates from 2020-11 to 2022-12\n",
|
|
"start_date = datetime(2020, 11, 1)\n",
|
|
"end_date = datetime(2022, 12, 1)\n",
|
|
"date_list = []\n",
|
|
"\n",
|
|
"current_date = start_date\n",
|
|
"while current_date <= end_date:\n",
|
|
" date_list.append(current_date.strftime(\"%Y-%m\"))\n",
|
|
" current_date += timedelta(days=32)\n",
|
|
" current_date = current_date.replace(day=1)\n",
|
|
"\n",
|
|
"# Create a progress bar\n",
|
|
"with tqdm(total=len(date_list), desc=\"Appending files\") as pbar:\n",
|
|
" for date_str in date_list:\n",
|
|
" file_url = base_url.format(date_str)\n",
|
|
" \n",
|
|
" # Download the file content\n",
|
|
" file_response = requests.get(file_url)\n",
|
|
" if file_response.status_code != 200:\n",
|
|
" print(f\"Failed to download {date_str}: {file_response.status_code}\")\n",
|
|
" continue\n",
|
|
" \n",
|
|
" # Read the Parquet file from the response content\n",
|
|
" file_content = io.BytesIO(file_response.content)\n",
|
|
" df = pq.read_table(file_content)\n",
|
|
" \n",
|
|
" # Append to the Iceberg table\n",
|
|
" table.append(df)\n",
|
|
" \n",
|
|
" pbar.update(1)\n",
|
|
" pbar.set_postfix_str(f\"Appended {date_str}\")\n",
|
|
"\n",
|
|
"# Print the total number of rows in the table after appending all files\n",
|
|
"print(f\"Total rows in the table: {len(table.scan().to_arrow())}\")"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.11.6"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
}
|