Got to dump the file locally

This commit is contained in:
Guillem Borrell 2024-05-21 20:50:17 +01:00
parent ea14f8c87e
commit ff781b6b9c
7 changed files with 105 additions and 53 deletions

2
.gitignore vendored
View file

@ -161,3 +161,5 @@ cython_debug/
#.idea/ #.idea/
.DS_Store .DS_Store
test/data/output/*

View file

@ -8,3 +8,4 @@ aiofiles
duckdb duckdb
polars polars
pyarrow pyarrow
xlsx2csv

View file

@ -0,0 +1,75 @@
import duckdb
class DDB:
def __init__(self):
self.db = duckdb.connect()
self.db.install_extension("spatial")
self.db.install_extension("httpfs")
self.db.load_extension("spatial")
self.db.load_extension("httpfs")
self.sheets = tuple()
self.path = ""
def gcs_secret(self, gcs_access: str, gcs_secret: str):
self.db.sql(f"""
CREATE SECRET (
TYPE GCS,
KEY_ID '{gcs_access}',
SECRET '{gcs_secret}')
""")
return self
def load_metadata(self, path: str = ""):
"""For some reason, the header is not loaded"""
self.db.sql(f"""
create table metadata as (
select
*
from
st_read('{path}',
layer='metadata'
)
)""")
self.sheets = tuple(
self.db.query("select Field2 from metadata where Field1 = 'Sheets'")
.fetchall()[0][0]
.split(",")
)
self.path = path
return self
def dump_local(self, path):
# TODO: Port to fsspec and have a single dump file
self.db.query(f"copy (select * from metadata) to '{path}/metadata.csv'")
for sheet in self.sheets:
self.db.query(f"""
copy
(
select
*
from
st_read
(
'{self.path}',
layer = '{sheet}'
)
)
to '{path}/{sheet}.csv'
""")
return self
def dump_gcs(self, bucketname, sid):
self.db.sql(f"""
copy
data
to
'gcs://{bucketname}/{sid}/data.csv';
""")
return self
def query(self, sql):
return self.db.query(sql)

View file

@ -1,11 +1,10 @@
import aiofiles import aiofiles
import duckdb
import polars as pl
import s3fs import s3fs
from fastapi import APIRouter, File, UploadFile from fastapi import APIRouter, File, UploadFile
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from ..config import settings from ..config import settings
from ..analytics import DDB
router = APIRouter() router = APIRouter()
@ -28,56 +27,13 @@ async def upload_file(file: UploadFile = File(...), sid: str = ""):
gcs.makedir(f"{settings.gcs_bucketname}/{sid}") gcs.makedir(f"{settings.gcs_bucketname}/{sid}")
db = duckdb.connect() (
db.install_extension("spatial") DDB()
db.install_extension("httpfs") .gcs_secret(settings.gcs_secret, settings.gcs_secret)
db.load_extension("httpfs") .load_metadata(f.name)
db.load_extension("spatial") .load_data()
.save_gcs(settings.gcs_bucketname, sid)
db.sql(f"""
CREATE SECRET (
TYPE GCS,
KEY_ID '{settings.gcs_access}',
SECRET '{settings.gcs_secret}')
""")
db.sql(f"""
create table metadata as (
select
*
from
st_read('{f.name}',
layer='metadata',
open_options=['HEADERS_FORCE', 'FIELD_TYPES=auto']
) )
)""")
metadata = db.query("select * from metadata").pl()
sheets = metadata.select(pl.col("Key") == "Sheets")
print(sheets)
for sheet in sheets.to_dict():
print(sheet)
db.sql(
f"""
create table data as (
select
*
from
st_read('{f.name}',
layer='data',
open_options=['HEADERS_FORCE', 'FIELD_TYPES=auto']
)
)"""
)
db.sql(f"""
copy
data
to
'gcs://{settings.gcs_bucketname}/{sid}/data.csv';
""")
return JSONResponse( return JSONResponse(
content={"message": "File uploaded successfully"}, status_code=200 content={"message": "File uploaded successfully"}, status_code=200

Binary file not shown.

1
test/output/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
*.csv

17
test/test_load.py Normal file
View file

@ -0,0 +1,17 @@
import hellocomputer
from hellocomputer.analytics import DDB
from pathlib import Path
TEST_DATA_FOLDER = Path(hellocomputer.__file__).parents[2] / "test" / "data"
TEST_OUTPUT_FOLDER = Path(hellocomputer.__file__).parents[2] / "test" / "output"
def test_load_data():
db = (
DDB()
.load_metadata(TEST_DATA_FOLDER / "TestExcelHelloComputer.xlsx")
.dump_local(TEST_OUTPUT_FOLDER)
)
assert db.sheets == ("answers",)
assert (TEST_OUTPUT_FOLDER / "answers.csv").exists()