from pathlib import Path from sqlalchemy import create_engine, text from sqlalchemy.orm import Session import retailtwin PACKAGE_ROOT = Path(retailtwin.__file__).parent / "retail" def funcandproc(db_uri: str): """Write functions and procedures into the given Postgresql database Args: db_uri (str): _description_ """ engine = create_engine(db_uri, echo=True) with Session(engine) as session: # Functions can be redefined in postgresql for predicate in (PACKAGE_ROOT / "sql").glob("*.sql"): print(f"Syncing {predicate}") with predicate.open() as sql: session.execute(text(sql.read())) session.commit() # Views have to be recreated if schema changes for predicate in (PACKAGE_ROOT / "sql" / "views").glob("*.sql"): print(f"Syncing view in {predicate}") # Get view name from the file name view_name = predicate.stem.removesuffix(".sql") with predicate.open() as sql: # First remove the view session.execute(text(f"drop view {view_name}")) session.commit() # And sync it session.execute(text(sql.read())) session.commit()