{ "cells": [ { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "env: PYICEBERG_MAX_WORKERS=300\n" ] } ], "source": [ "%env PYICEBERG_MAX_WORKERS=300" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "#!pip install \"pyiceberg[s3fs]\"" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "from pyiceberg.catalog.rest import RestCatalog" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "catalog = RestCatalog(\"public\", ** {\n", " \"uri\": f\"http://localhost:8181\",\n", "})" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "catalog.create_namespace(\"public\")" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "from pyiceberg.schema import Schema\n", "from pyiceberg.types import (\n", " NestedField,\n", " LongType,\n", " TimestampType,\n", " DoubleType,\n", " StringType,\n", ")\n", "\n", "schema = Schema(\n", " NestedField(field_id=1, name=\"VendorID\", field_type=LongType(), required=False),\n", " NestedField(field_id=2, name=\"tpep_pickup_datetime\", field_type=TimestampType(), required=False),\n", " NestedField(field_id=3, name=\"tpep_dropoff_datetime\", field_type=TimestampType(), required=False),\n", " NestedField(field_id=4, name=\"passenger_count\", field_type=DoubleType(), required=False),\n", " NestedField(field_id=5, name=\"trip_distance\", field_type=DoubleType(), required=False),\n", " NestedField(field_id=6, name=\"RatecodeID\", field_type=DoubleType(), required=False),\n", " NestedField(field_id=7, name=\"store_and_fwd_flag\", field_type=StringType(), required=False),\n", " NestedField(field_id=8, name=\"PULocationID\", field_type=LongType(), required=False),\n", " NestedField(field_id=9, name=\"DOLocationID\", field_type=LongType(), required=False),\n", " NestedField(field_id=10, name=\"payment_type\", field_type=LongType(), required=False),\n", " NestedField(field_id=11, name=\"fare_amount\", field_type=DoubleType(), required=False),\n", " NestedField(field_id=12, name=\"extra\", field_type=DoubleType(), required=False),\n", " NestedField(field_id=13, name=\"mta_tax\", field_type=DoubleType(), required=False),\n", " NestedField(field_id=14, name=\"tip_amount\", field_type=DoubleType(), required=False),\n", " NestedField(field_id=15, name=\"tolls_amount\", field_type=DoubleType(), required=False),\n", " NestedField(field_id=16, name=\"improvement_surcharge\", field_type=DoubleType(), required=False),\n", " NestedField(field_id=17, name=\"total_amount\", field_type=DoubleType(), required=False),\n", " NestedField(field_id=18, name=\"congestion_surcharge\", field_type=DoubleType(), required=False),\n", " NestedField(field_id=19, name=\"airport_fee\", field_type=DoubleType(), required=False),\n", ")\n" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "from pyiceberg.partitioning import PartitionSpec, PartitionField\n", "from pyiceberg.transforms import DayTransform, MonthTransform, YearTransform, BucketTransform\n", "\n", "partition_spec = PartitionSpec(\n", " PartitionField(source_id=2, field_id=1001, transform=MonthTransform(), name=\"tpep_pickup_datetime_month\"),\n", ")" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "from pyiceberg.table.sorting import SortOrder, SortField\n", "from pyiceberg.transforms import IdentityTransform\n", "\n", "sort_order = SortOrder(\n", " SortField(source_id=4, transform=IdentityTransform())\n", ")" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "table = catalog.create_table(\n", " identifier=\"public.nyc_taxi\",\n", " schema=schema,\n", " partition_spec=partition_spec,\n", " sort_order=sort_order,\n", " properties={\n", " \"write.format.default\": \"parquet\",\n", " \"write.parquet.compression-codec\": \"zstd\",\n", " \"write.target-file-size-bytes\": \"536870912\",\n", " \"s3.connect-timeout\": \"10000\"\n", " }\n", ")" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Appending files: 100%|██████████| 26/26 [11:30<00:00, 26.56s/it, Appended 2022-12]\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Total rows in the table: 73531304\n" ] } ], "source": [ "import requests\n", "import io\n", "import pyarrow.parquet as pq\n", "from tqdm import tqdm\n", "from datetime import datetime, timedelta\n", "\n", "# Base URL for the Parquet files\n", "base_url = \"https://pub-f6a668561f5e4bd6ac651efd8c18998d.r2.dev/nyc_taxi/yellow_tripdata_{}.parquet\"\n", "\n", "# Generate a list of dates from 2020-11 to 2022-12\n", "start_date = datetime(2020, 11, 1)\n", "end_date = datetime(2022, 12, 1)\n", "date_list = []\n", "\n", "current_date = start_date\n", "while current_date <= end_date:\n", " date_list.append(current_date.strftime(\"%Y-%m\"))\n", " current_date += timedelta(days=32)\n", " current_date = current_date.replace(day=1)\n", "\n", "# Create a progress bar\n", "with tqdm(total=len(date_list), desc=\"Appending files\") as pbar:\n", " for date_str in date_list:\n", " file_url = base_url.format(date_str)\n", " \n", " # Download the file content\n", " file_response = requests.get(file_url)\n", " if file_response.status_code != 200:\n", " print(f\"Failed to download {date_str}: {file_response.status_code}\")\n", " continue\n", " \n", " # Read the Parquet file from the response content\n", " file_content = io.BytesIO(file_response.content)\n", " df = pq.read_table(file_content)\n", " \n", " # Append to the Iceberg table\n", " table.append(df)\n", " \n", " pbar.update(1)\n", " pbar.set_postfix_str(f\"Appended {date_str}\")\n", "\n", "# Print the total number of rows in the table after appending all files\n", "print(f\"Total rows in the table: {len(table.scan().to_arrow())}\")" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.6" } }, "nbformat": 4, "nbformat_minor": 2 }