""" Terminal that mimics what a Point of Sale may operate like. """ import typer import polars as pl from retailtwin.cli.db import query_local_batches from retailtwin.cli.pos.models import Base, Sync, Direction, Cart, Item from retailtwin.utils import db_uri_from_session from datetime import datetime from sqlalchemy import create_engine, select from sqlalchemy.orm import Session from prompt_toolkit import PromptSession from prompt_toolkit.completion import WordCompleter # from prompt_toolkit.shortcuts import clear from rich.console import Console from rich.markdown import Markdown console = Console() HELP = """ # Retail twin POS This is a simple terminal that simulates a dumb PoS with a single-line screen. """ CACHE_FILE_URL = "sqlite:///pos.db" CONNECT_ERROR_MESSAGE = """ # ERROR Could not sync to the central database. The terminal will try to operate without a connection. """ def sync(session: Session, db_remote_uri: str, location: int): db_local_uri = db_uri_from_session(session) # Fetch data from the remote database try: inventory = query_local_batches(db_remote_uri, location) except: # noqa: E722 console.print(Markdown(CONNECT_ERROR_MESSAGE)) return # The price and the discount is fixed by the latest received batch items = ( inventory.select( [ pl.col("upc"), pl.col("price"), pl.col("discount_definition").alias("discount"), pl.col("received"), pl.col("received").max().over("upc").alias("max_received"), ] ) .filter(pl.col("received") == pl.col("max_received")) .select([pl.col("upc"), pl.col("price"), pl.col("discount")]) .with_columns(pl.col("price").str.strip().cast(pl.Float32) * 100) .with_columns(pl.col("price").cast(pl.Int32)) ) items.write_database( "items", db_local_uri, if_exists="replace", engine="sqlalchemy" ) # Sync all customers ( pl.read_database("select * from customers", db_remote_uri, engine="adbc") .select([pl.col("id"), pl.col("document")]) .write_database( "customers", db_local_uri, if_exists="replace", engine="sqlalchemy" ) ) # Add last sync sync = Sync(direction=Direction.pull) session.add(sync) session.commit() # Sync unsynced carts unsynced_carts = pl.read_database( "select * from carts where synced = false and total_amount is not null", db_local_uri, engine="adbc", ) if not unsynced_carts.is_empty(): ( unsynced_carts.select( [pl.col("checkout_dt").alias("checkout"), pl.col("customer")] ) .with_columns(location=location) .write_database( "carts", db_remote_uri, if_exists="append", engine="sqlalchemy" ) ) # Query for items to sync query = """ select c.id as cart, i.upc as upc, i.quantity from itemsoncart i join carts c on i.cart = c.id where c.synced = false""" pl.read_database(query, db_local_uri, engine="adbc").write_database( "itemsoncart", db_remote_uri, engine="sqlalchemy" ) # Set synced to cart at the end of the process for cart in session.scalars( select(Cart).filter(Cart.synced == False) # noqa: E712 ).all(): cart.synced = True session.commit() def load_items(session: Session): return [str(it) for it in session.scalars(select(Item.upc)).all()] def handle_command(command: str, cart_id: int, item_id: int, **kwargs): print(command, cart_id, item_id) session = kwargs["session"] if command == "n": # Start a new cart cart = Cart(checkin_dt=datetime.now(), synced=False) session.add(cart) session.commit() return cart.id, "", "" # Handle adding a new item elif cart_id is not None: # Check if the upc is in the database if session.scalar(select(Item).filter(Item.upc == int(command))).one_or_none(): return cart_id, item_id, "" else: return cart_id, "", "ERROR: Code not found" else: cart_id, item_id if item_id else "", "" def main(db_uri: str, location: int): console.print("Fetching data...") # Create the schema of the local database local_engine = create_engine(CACHE_FILE_URL) Base.metadata.create_all(local_engine) with Session(local_engine) as session: sync(session, db_uri, location) # Default completer with barcodes completer = WordCompleter(load_items(session)) # Update the local copy of the data. console.print(Markdown(HELP)) term = PromptSession() # State of the pos cart_id = "" item_id = "" error = "" while True: if cart_id and item_id: if error: prompt = f"[{item_id}# quantity] !{error}!>" else: prompt = f"[{item_id}# quantity]>" elif cart_id and not item_id: prompt = f"[{cart_id}# code]> " else: prompt = "#> " try: text = term.prompt(prompt, completer=completer) except KeyboardInterrupt: continue except EOFError: break else: if text in ["x", "q"]: break elif text == "h": console.print(Markdown(HELP)) elif text == "r": console.print("Refreshing...") sync(session, db_uri, location) completer = WordCompleter(load_items(session)) else: cart_id, item_id, error = handle_command( text, cart_id, item_id, location=location, completer=completer, db_remote_uri=db_uri, session=session, ) print("GoodBye!") app = typer.run(main)