diff --git a/Cargo.toml b/Cargo.toml index 82a167c..35e1a23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dr" description = "Command-line data file processing in Rust" -version = "0.3.1" +version = "0.4" edition = "2021" include = [ "**/*.rs", @@ -14,7 +14,4 @@ repository = "https://git.guillemborrell.es/guillem/dr" [dependencies] clap = {version = "4.0", features = ["cargo"]} -polars = "0.25" -polars-sql = "0.2.1" -polars-lazy = "0.25" -polars-io = {"version" = "0.25", features = ["parquet"]} +polars = {"version" = "0.25", features = ["sql", "lazy", "parquet", "decompress", "ipc", "ipc_streaming", "docs-selection"]} diff --git a/README.md b/README.md index e70f8d9..daa3eef 100644 --- a/README.md +++ b/README.md @@ -9,51 +9,21 @@ You may wonder why I'm implementing this, since there's already [xsv](https://gi 1. This what I'm implementing to learn Rust 2. The Rust data ecosystem has evolved immensely since xsv was sarted. Now we can add things like SQL commands to filter csv files, or translate results to parquet files. -## Example - -```bash -$ head wine.csv -Wine,Alcohol,Malic.acid,Ash,Acl,Mg,Phenols,Flavanoids,Nonflavanoid.phenols,Proanth,Color.int,Hue,OD,Proline -1,14.23,1.71,2.43,15.6,127,2.8,3.06,.28,2.29,5.64,1.04,3.92,1065 -1,13.2,1.78,2.14,11.2,100,2.65,2.76,.26,1.28,4.38,1.05,3.4,1050 -1,13.16,2.36,2.67,18.6,101,2.8,3.24,.3,2.81,5.68,1.03,3.17,1185 -1,14.37,1.95,2.5,16.8,113,3.85,3.49,.24,2.18,7.8,.86,3.45,1480 -1,13.24,2.59,2.87,21,118,2.8,2.69,.39,1.82,4.32,1.04,2.93,735 -1,14.2,1.76,2.45,15.2,112,3.27,3.39,.34,1.97,6.75,1.05,2.85,1450 -1,14.39,1.87,2.45,14.6,96,2.5,2.52,.3,1.98,5.25,1.02,3.58,1290 -1,14.06,2.15,2.61,17.6,121,2.6,2.51,.31,1.25,5.05,1.06,3.58,1295 -1,14.83,1.64,2.17,14,97,2.8,2.98,.29,1.98,5.2,1.08,2.85,1045 - -$ cat wine.csv | dr sql "select Wine, avg(Alcohol) from this group by Wine" | dr print -shape: (3, 2) -┌──────┬───────────┐ -│ Wine ┆ Alcohol │ -│ --- ┆ --- │ -│ i64 ┆ f64 │ -╞══════╪═══════════╡ -│ 3 ┆ 13.15375 │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤ -│ 1 ┆ 13.744746 │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤ -│ 2 ┆ 12.278732 │ -└──────┴───────────┘ -``` - -## Howto -The `dr` command offers a set of subcommands, each one of them with a different functionality. You can get the available subcommands with: +## Help -```bash +``` $ dr --help Command-line data file processing in Rust Usage: dr [COMMAND] Commands: + csv Read csv, output arrow stream sql Runs a sql statement on the file print Pretty prints the table rpq Read parquet file - wpq Write to a parquet file + wpq Write to a paquet file help Print this message or the help of the given subcommand(s) Options: @@ -61,11 +31,21 @@ Options: -V, --version Print version information ``` -Subcommands can be pipelined unless reading from a file, writing to a file, or pretty prints data. What goes through the pipeline is a plain-text comma separated values with a header. While this may not be the best choice in terms of performance, allows `dr` subcommands to be combined with the usual unix-style command-line tools like `cat`, `head`, `grep`, `awk` and `sed`: +## Howto + +`dr` is convenience command to explore, transform, and analyze csv and parquet files to save you from writing throwaway python scripts or create a custom container image for verys simple tasks. It's designed to make the life of a data engineer a little easier. + +Assume you have a very large csv file, and you just want to translate it to parquet with some type inference and sane defaults. With `dr` this is as easy as: + +``` +$ dr csv wine.csv -P wine.pq +``` + +Parquet files are binary, and you may want to check that you've not written nonsense by printing the header on your terminal. -```bash -$ cat wine.csv | head -n 5 | dr print -shape: (4, 14) +``` +$ dr rpq wine.pq -a +shape: (5, 14) ┌──────┬─────────┬────────────┬──────┬─────┬───────────┬──────┬──────┬─────────┐ │ Wine ┆ Alcohol ┆ Malic.acid ┆ Ash ┆ ... ┆ Color.int ┆ Hue ┆ OD ┆ Proline │ │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │ @@ -78,112 +58,91 @@ shape: (4, 14) │ 1 ┆ 13.16 ┆ 2.36 ┆ 2.67 ┆ ... ┆ 5.68 ┆ 1.03 ┆ 3.17 ┆ 1185 │ ├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ │ 1 ┆ 14.37 ┆ 1.95 ┆ 2.5 ┆ ... ┆ 7.8 ┆ 0.86 ┆ 3.45 ┆ 1480 │ +├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ +│ 1 ┆ 13.24 ┆ 2.59 ┆ 2.87 ┆ ... ┆ 4.32 ┆ 1.04 ┆ 2.93 ┆ 735 │ └──────┴─────────┴────────────┴──────┴─────┴───────────┴──────┴──────┴─────────┘ ``` -Note that when `dr` loads csv data also tries to guess the data type of each field. - -### Parquet +Maybe the most interesing feature of `dr` is the ability to process csv and parquet files using SQL, while solutions like `xsv` and `csvkit` rely on a rich set of subcommands and options. If you already know SQL, there's no need to read any more documentation to select, filter, or group data. The only thing you need to remember is that the table will be called `this`. The following command outputs a csv of the wine with the highest concentration of alcohol in the popular wine dataset: -`dr` is also useful to translate your csv files to parquet with a single command: - -```bash -$ cat wine.csv | dr wpq wine.pq ``` - -Or explore parquet files - -```bash -$ dr rpq wine.pq | head -n 5 | dr print -shape: (4, 14) + dr csv wine.csv -q "select * from this where Alcohol = max(Alcohol)" | dr print +shape: (1, 14) ┌──────┬─────────┬────────────┬──────┬─────┬───────────┬──────┬──────┬─────────┐ │ Wine ┆ Alcohol ┆ Malic.acid ┆ Ash ┆ ... ┆ Color.int ┆ Hue ┆ OD ┆ Proline │ │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │ │ i64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ i64 │ ╞══════╪═════════╪════════════╪══════╪═════╪═══════════╪══════╪══════╪═════════╡ -│ 1 ┆ 14.23 ┆ 1.71 ┆ 2.43 ┆ ... ┆ 5.64 ┆ 1.04 ┆ 3.92 ┆ 1065 │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ -│ 1 ┆ 13.2 ┆ 1.78 ┆ 2.14 ┆ ... ┆ 4.38 ┆ 1.05 ┆ 3.4 ┆ 1050 │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ -│ 1 ┆ 13.16 ┆ 2.36 ┆ 2.67 ┆ ... ┆ 5.68 ┆ 1.03 ┆ 3.17 ┆ 1185 │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ -│ 1 ┆ 14.37 ┆ 1.95 ┆ 2.5 ┆ ... ┆ 7.8 ┆ 0.86 ┆ 3.45 ┆ 1480 │ +│ 1 ┆ 14.83 ┆ 1.64 ┆ 2.17 ┆ ... ┆ 5.2 ┆ 1.08 ┆ 2.85 ┆ 1045 │ └──────┴─────────┴────────────┴──────┴─────┴───────────┴──────┴──────┴─────────┘ ``` +If you don't use any option that formats the output of the results, `dr` outputs Arrow's IPC format, meaning that multiple `dr` calls can be efficiently chained with very low overhead. The following script loads one month of NY taxi data and executes two sql queries on the data. -## Performance - -`dr` is implemented in Rust with the goal of achieving the highest possible performance. Take for instance a simple read, groupby, and aggregate operation with ~30MB of data: - -```bash -$ time cat data/walmart_train.csv | dr sql "select Dept, avg("Weekly_Sales") from this group by Dept" | dr print -shape: (81, 2) -┌──────┬──────────────┐ -│ Dept ┆ Weekly_Sales │ -│ --- ┆ --- │ -│ i64 ┆ f64 │ -╞══════╪══════════════╡ -│ 30 ┆ 4118.197208 │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -│ 16 ┆ 14245.63827 │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -│ 56 ┆ 3833.706211 │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -│ 24 ┆ 6353.604562 │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -│ ... ┆ ... │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -│ 31 ┆ 2339.440287 │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -│ 59 ┆ 694.463564 │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -│ 27 ┆ 1583.437727 │ -├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -│ 77 ┆ 328.9618 │ -└──────┴──────────────┘ - -real 0m0.089s -user 0m0.116s -sys 0m0.036s +``` +$ dr rpq data/yellow_tripdata_2014-01.parquet \ + -q "select count(1) as cnt, passenger_count from this group by passenger_count" \ + | dr sql "select * from this order by cnt desc" \ + | dr print +┌─────────┬─────────────────┐ +│ cnt ┆ passenger_count │ +│ --- ┆ --- │ +│ u32 ┆ i64 │ +╞═════════╪═════════════════╡ +│ 9727321 ┆ 1 │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ 1891588 ┆ 2 │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ 789070 ┆ 5 │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ 566248 ┆ 3 │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ ... ┆ ... │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ 19 ┆ 208 │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ 16 ┆ 9 │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ 7 ┆ 7 │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ 5 ┆ 8 │ +└─────────┴─────────────────┘ ``` -Let's compare that with the followint Python script that leverages Pandas to read the data, and compute the aggregation: +## Reference -```python -#!/usr/bin/env python3 +Some commands that generate raw output in ipc format. -import sys -import pandas as pd +* Read a csv or parquet file and print the header: `dr {csv, rpq} [file] -a` +* Read a csv or parquet file, execute a SQL statement, and output the results in stdout using Arrow's ipc format `dr {csv, rpq} [file] -q "statement"` +* Read a csv or parquet file and print a summary of each column: `dr {csv, rpq} [file] -s "[query]"` +* Read a csv or parquet file, execute a query, and output the results in stdout using the csv format `dr {csv, rpq} [file] -s "[query]" -t` +* Read a csv and write a parquet file with the same contents: `dr csv [file.csv] -P [file.pq]` -df = pd.read_csv(sys.stdin) -print(df.groupby("Dept", sort=False, as_index=False).Weekly_Sales.mean()) -``` +Some commands that convert raw input in ipc format + +* Read from stdin in ipc and pretty print the table: `dr print` +* Read from stdin in csv and pretty print the table: `dr print -t` +* Read from stdin in ipc and write the data in parquet: `dr wpq [file.pq]` + +## Performance -```bash -$ time cat data/walmart_train.csv | ./python/group.py - Dept Weekly_Sales -0 1 19213.485088 -1 2 43607.020113 -2 3 11793.698516 -3 4 25974.630238 -4 5 21365.583515 -.. ... ... -76 99 415.487065 -77 39 11.123750 -78 50 2658.897010 -79 43 1.193333 -80 65 45441.706224 - -[81 rows x 2 columns] - -real 0m0.717s -user 0m0.627s -sys 0m0.282s +This command runs two dr processes. The first one makes an aggregation on a compressed parquet file of 144MB of size, and the second one just orders the result: + +``` +$ dr rpq data/yellow_tripdata_2014-01.parquet \ + -q "select count(1) as cnt, passenger_count from this group by passenger_count" \ + | dr sql "select * from this order by cnt desc" \ + > /dev/null ``` -Note that there's roughly a 6x speedup. This considering that this operation in particular is heavily optimized in Pandas and most of the run time is spent in parsing and reading from stdin. +On a very very old machine (Intel(R) Core(TM) i5-6500T CPU @ 2.50GHz), this takes around half a second, which is roughly the time needed to read and decompress the parquet file. Polar's csv and parquet readers have some decent performance, so you can count on `dr` to be one of the fastest in the block. + +## Caveats + +1. `dr` uses Polars to build and transform dataframes in Rust, and the entire table has to be loaded in memory. At the time when `dr` was created, streaming support didn't get along very well with SQL contexts. +2. `dr` uses Polars' SQLContext to execute the query which supports a small subset of the SQL language. ## Built standing on the shoulders of giants diff --git a/python/group.py b/python/group.py deleted file mode 100755 index 8106910..0000000 --- a/python/group.py +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env python3 - -import sys -import pandas as pd - -df = pd.read_csv(sys.stdin) -print(df.groupby("Dept", sort=False, as_index=False).Weekly_Sales.mean()) \ No newline at end of file diff --git a/src/io.rs b/src/io.rs index df4972a..e5d5151 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,22 +1,53 @@ -use polars::frame::DataFrame; use polars::prelude::*; use std::fs; use std::io; use std::io::Read; -/// Read CSV format from stdin and return a Polars DataFrame -pub fn load_csv_from_stdin() -> DataFrame { - let mut buffer = String::new(); - let _res: () = match io::stdin().read_to_string(&mut buffer) { +/// Read CSV file +pub fn read_csv(path: String) -> LazyFrame { + LazyCsvReader::new(path) + .finish() + .expect("Could not load file") +} + +/// Read parquet and return a Polars LazyFrame +pub fn read_parquet(path: String) -> LazyFrame { + LazyFrame::scan_parquet(path, ScanArgsParquet::default()).expect("Could not read parquet file") +} + +/// Read IPC setream +pub fn read_ipc() -> LazyFrame { + let mut buffer = Vec::new(); + let _res: () = match io::stdin().lock().read_to_end(&mut buffer) { Ok(_ok) => (), Err(_e) => (), }; - let cursor = io::Cursor::new(buffer.as_bytes()); - let df = match CsvReader::new(cursor).finish() { - Ok(df) => df, - Err(_e) => DataFrame::default(), + let cursor = io::Cursor::new(buffer); + match IpcStreamReader::new(cursor).finish() { + Ok(df) => df.lazy(), + Err(_e) => LazyFrame::default(), + } +} + +/// Read CSV format from stdin and return a Polars DataFrame +pub fn load_csv_from_stdin() -> LazyFrame { + let mut buffer = Vec::new(); + let _res: () = match io::stdin().lock().read_to_end(&mut buffer) { + Ok(_ok) => (), + Err(_e) => (), }; - df + let cursor = io::Cursor::new(buffer); + match CsvReader::new(cursor).finish() { + Ok(df) => df.lazy(), + Err(_e) => LazyFrame::default(), + } +} + +/// Write to IPC steram +pub fn write_ipc(df: LazyFrame) { + IpcStreamWriter::new(io::stdout().lock()) + .finish(&mut df.collect().expect("Could not collect dataframe")) + .expect("Could not write to stream"); } /// Take a Polars Dataframe and write it as CSV to stdout @@ -27,19 +58,6 @@ pub fn dump_csv_to_stdout(df: &mut DataFrame) { }; } -/// Read parquet and return a Polars DataFrame -pub fn read_parquet(path: String) -> DataFrame { - let file = fs::File::open(path).expect("Could not open file"); - let df = match ParquetReader::new(file).finish() { - Ok(df) => df, - Err(e) => { - eprintln!("{e}"); - DataFrame::default() - } - }; - df -} - /// Write a Polars DataFrame to Parquet pub fn write_parquet( mut df: DataFrame, diff --git a/src/main.rs b/src/main.rs index 4b43185..5353975 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,54 +1,179 @@ mod io; mod sql; -use clap::{arg, command, Command}; +use clap::{arg, command, ArgAction, Command}; fn main() { let matches = command!() + .subcommand( + Command::new("csv") + .about("Read csv, output arrow stream") + .arg(arg!([path] "Path to CSV file")) + .arg(arg!(-d --delimiter "Column delimiter. Assume ,").required(false)) + .arg(arg!(-q --query "Execute query on the file").required(false)) + .arg( + arg!(-s --summary ... "Summarize the data") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg( + arg!(-t --text ... "Output text instead of binary") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg(arg!(-P --parquet "Write output as a parquet file").required(false)) + .arg( + arg!(-a --head ... "Print the header of the table") + .required(false) + .action(ArgAction::SetTrue), + ), + ) .subcommand( Command::new("sql") .about("Runs a sql statement on the file") .arg(arg!([statement] "SQL statement")) + .arg( + arg!(-t --text ... "Input text instead of binary") + .required(false) + .action(ArgAction::SetTrue), + ) .arg(arg!(-d --delimiter "Column delimiter").required(false)), ) - .subcommand(Command::new("print").about("Pretty prints the table")) + .subcommand( + Command::new("print").about("Pretty prints the table").arg( + arg!(-t --text ... "Inputs csv instead of binary") + .required(false) + .action(ArgAction::SetTrue), + ), + ) .subcommand( Command::new("rpq") .about("Read parquet file") - .arg(arg!([path] "Path to the parquet file")), + .arg(arg!([path] "Path to the parquet file")) + .arg(arg!(-q --query "Execute query on the file").required(false)) + .arg( + arg!(-s --summary ... "Summarize the data") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg( + arg!(-t --text ... "Output text instead of binary") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg(arg!(-P --text "Write the result as a parquet file").required(false)) + .arg( + arg!(-a --head ... "Print the header of the table") + .required(false) + .action(ArgAction::SetTrue), + ), ) .subcommand( Command::new("wpq") .about("Write to a paquet file") + .arg( + arg!(-t --text ... "Output text instead of binary") + .required(false) + .action(ArgAction::SetTrue), + ) .arg(arg!([path] "Path to the new parquet file")), ) .get_matches(); - if let Some(matches) = matches.subcommand_matches("sql") { - //if let Some(delimiter) = matches.get_one::("delimiter") { - // println!("DEBUG: Delimiter: {delimiter}") - //} else { - // println!("DEBUG: No delimiter") - //} - if let Some(statement) = matches.get_one::("statement") { - sql::execute(statement); + if let Some(_matches) = matches.subcommand_matches("csv") { + if let Some(path) = _matches.get_one::("path") { + let mut ldf = io::read_csv(path.to_string()); + if let Some(query) = _matches.get_one::("query") { + ldf = sql::execute(ldf, query); + } + if _matches.get_flag("summary") { + let df = ldf.collect().expect("Could not collect"); + println!("{}", df.describe(None)); + } else if _matches.get_flag("head") { + let df = ldf.fetch(5).expect("Could not fetch"); + println!("{}", df) + } else { + if _matches.get_flag("text") { + io::dump_csv_to_stdout(&mut ldf.collect().expect("Could not collect")); + } else { + if let Some(path) = _matches.get_one::("parquet") { + io::write_parquet( + ldf.collect().expect("Could not collect"), + path.to_string(), + "lz4raw".to_string(), + true, + Some(0), + ); + } else { + io::write_ipc(ldf); + } + } + } + } + } else if let Some(_matches) = matches.subcommand_matches("sql") { + if let Some(statement) = _matches.get_one::("statement") { + let ldf = if _matches.get_flag("text") { + io::load_csv_from_stdin() + } else { + io::read_ipc() + }; + let res = sql::execute(ldf, statement); + io::write_ipc(res); } else { - let mut df = io::load_csv_from_stdin(); - io::dump_csv_to_stdout(&mut df); + io::write_ipc(io::read_ipc()); } } else if let Some(_matches) = matches.subcommand_matches("print") { - let df = io::load_csv_from_stdin(); - println!("{}", df) - } else if let Some(matches) = matches.subcommand_matches("rpq") { - if let Some(path) = matches.get_one::("path") { - let mut df = io::read_parquet(path.to_string()); - io::dump_csv_to_stdout(&mut df); + let df = if _matches.get_flag("text") { + io::load_csv_from_stdin() + } else { + io::read_ipc() + }; + println!("{}", df.collect().expect("Could not collect")); + } else if let Some(_matches) = matches.subcommand_matches("rpq") { + if let Some(path) = _matches.get_one::("path") { + let mut ldf = io::read_parquet(path.to_string()); + if let Some(query) = _matches.get_one::("query") { + ldf = sql::execute(ldf, query); + } + if _matches.get_flag("summary") { + let df = ldf.collect().expect("Could not collect"); + println!("{}", df.describe(None)); + } else if _matches.get_flag("head") { + let df = ldf.fetch(5).expect("Could not fetch"); + println!("{}", df) + } else { + if _matches.get_flag("text") { + io::dump_csv_to_stdout(&mut ldf.collect().expect("Could not collect")); + } else { + if let Some(path) = _matches.get_one::("parquet") { + io::write_parquet( + ldf.collect().expect("Could not collect"), + path.to_string(), + "lz4raw".to_string(), + true, + Some(0), + ); + } else { + io::write_ipc(ldf); + } + } + } } else { eprintln!("File not found") } - } else if let Some(matches) = matches.subcommand_matches("wpq") { - if let Some(path) = matches.get_one::("path") { - let df = io::load_csv_from_stdin(); - io::write_parquet(df, path.to_string(), "lz4raw".to_string(), true, Some(0)); + } else if let Some(_matches) = matches.subcommand_matches("wpq") { + if let Some(path) = _matches.get_one::("path") { + let df = if _matches.get_flag("text") { + io::load_csv_from_stdin() + } else { + io::read_ipc() + }; + io::write_parquet( + df.collect().expect("Could not collect"), + path.to_string(), + "lz4raw".to_string(), + true, + Some(0), + ); } else { eprintln!("Could now write to parquet"); } diff --git a/src/sql.rs b/src/sql.rs index 9741e80..e908ad8 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -1,19 +1,15 @@ -use crate::io::dump_csv_to_stdout; -use crate::io::load_csv_from_stdin; -use polars_lazy::frame::IntoLazy; -use polars_sql::SQLContext; +use polars::prelude::LazyFrame; +use polars::sql::SQLContext; -pub fn execute(statement: &String) { - if let Ok(mut context) = SQLContext::try_new() { - let df = load_csv_from_stdin(); - context.register("this", df.lazy()); - if let Ok(res) = context.execute(statement) { - if let Ok(mut res) = res.collect() { - dump_csv_to_stdout(&mut res); - }; - }; - if let Err(e) = context.execute(statement) { - eprintln!("Query execution error {e}") - }; - }; +pub fn execute(ldf: LazyFrame, statement: &String) -> LazyFrame { + let mut context = SQLContext::try_new().expect("Could not create context"); + context.register("this", ldf); + + match context.execute(statement) { + Ok(res) => res, + Err(e) => { + eprintln!("Query execution error {e}"); + LazyFrame::default() + } + } }