diff --git a/Cargo.toml b/Cargo.toml index 342cc7e..1f9a55b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dr" description = "Command-line data file processing in Rust" -version = "0.5.2" +version = "0.5.3" edition = "2021" include = [ "**/*.rs", @@ -15,6 +15,6 @@ repository = "https://git.guillemborrell.es/guillem/dr" [dependencies] clap = {version = "4.0", features = ["cargo"]} polars-lazy = {"version" = "0.26", "features" = ["parquet", "ipc", "csv-file"]} -polars-core = {"version" = "0.26", "features" = ["describe"]} +polars-core = {"version" = "0.26", "features" = ["describe", "fmt"]} polars-io = {"version" = "0.26", "features" = ["ipc_streaming"]} polars-sql = {"version" = "0.2.2"} diff --git a/src/io.rs b/src/io.rs index 191a307..9eba78c 100644 --- a/src/io.rs +++ b/src/io.rs @@ -44,6 +44,20 @@ pub fn load_csv_from_stdin() -> LazyFrame { } } +/// Read CSV format from stdin and return a Polars DataFrame +pub fn load_parquet_from_stdin() -> LazyFrame { + let mut buffer = Vec::new(); + let _res: () = match io::stdin().lock().read_to_end(&mut buffer) { + Ok(_ok) => (), + Err(_e) => (), + }; + let cursor = io::Cursor::new(buffer); + match ParquetReader::new(cursor).finish() { + Ok(df) => df.lazy(), + Err(_e) => LazyFrame::default(), + } +} + /// Write to IPC steram pub fn write_ipc(df: LazyFrame) { IpcStreamWriter::new(io::stdout().lock()) @@ -75,5 +89,6 @@ pub fn write_parquet(ldf: LazyFrame, path: String) { data_pagesize_limit: None, maintain_order: false, }, - ).expect("Could not save"); + ) + .expect("Could not save"); } diff --git a/src/main.rs b/src/main.rs index 5e048d7..d038ef8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod io; mod sql; use clap::{arg, command, ArgAction, Command}; +use polars_lazy::prelude::*; fn main() { let matches = command!() @@ -55,12 +56,20 @@ fn main() { .required(false) .action(ArgAction::SetTrue), ) + .arg( + arg!(-i --stdin ... "Read from stdin instead than from a file") + .required(false) + .action(ArgAction::SetTrue), + ) .arg( arg!(-t --text ... "Output text instead of binary") .required(false) .action(ArgAction::SetTrue), ) - .arg(arg!(-P --text "Write the result as a parquet file").required(false)) + .arg( + arg!(-P --parquet "Write the result as a parquet file") + .required(false), + ) .arg( arg!(-a --head ... "Print the header of the table") .required(false) @@ -123,30 +132,33 @@ fn main() { }; println!("{}", df.collect().expect("Could not collect")); } else if let Some(_matches) = matches.subcommand_matches("rpq") { - if let Some(path) = _matches.get_one::("path") { - let mut ldf = io::read_parquet(path.to_string()); - if let Some(query) = _matches.get_one::("query") { - ldf = sql::execute(ldf, query); - } - if _matches.get_flag("summary") { - let df = ldf.collect().expect("Could not collect"); - println!("{}", df.describe(None)); - } else if _matches.get_flag("head") { - let df = ldf.fetch(5).expect("Could not fetch"); - println!("{}", df) + let mut ldf = LazyFrame::default(); + if _matches.get_flag("stdin") { + ldf = io::load_parquet_from_stdin(); + } else if let Some(path) = _matches.get_one::("path") { + ldf = io::read_parquet(path.to_string()); + } else { + eprintln!("File not found or not reading from stdin") + } + if let Some(query) = _matches.get_one::("query") { + ldf = sql::execute(ldf, query); + } + if _matches.get_flag("summary") { + let df = ldf.collect().expect("Could not collect"); + println!("{}", df.describe(None)); + } else if _matches.get_flag("head") { + let df = ldf.fetch(5).expect("Could not fetch"); + println!("{}", df) + } else { + if _matches.get_flag("text") { + io::dump_csv_to_stdout(ldf); } else { - if _matches.get_flag("text") { - io::dump_csv_to_stdout(ldf); + if let Some(path) = _matches.get_one::("parquet") { + io::write_parquet(ldf, path.to_string()); } else { - if let Some(path) = _matches.get_one::("parquet") { - io::write_parquet(ldf, path.to_string()); - } else { - io::write_ipc(ldf); - } + io::write_ipc(ldf); } } - } else { - eprintln!("File not found") } } else if let Some(_matches) = matches.subcommand_matches("wpq") { if let Some(path) = _matches.get_one::("path") {