diff --git a/Cargo.toml b/Cargo.toml index 9024d51..342cc7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dr" description = "Command-line data file processing in Rust" -version = "0.5.1" +version = "0.5.2" edition = "2021" include = [ "**/*.rs", @@ -14,5 +14,7 @@ repository = "https://git.guillemborrell.es/guillem/dr" [dependencies] clap = {version = "4.0", features = ["cargo"]} -polars-lazy = {"version" = "0.25"} -polars = {"version" = "0.25", features = ["sql", "parquet", "decompress", "ipc", "ipc_streaming", "docs-selection"]} +polars-lazy = {"version" = "0.26", "features" = ["parquet", "ipc", "csv-file"]} +polars-core = {"version" = "0.26", "features" = ["describe"]} +polars-io = {"version" = "0.26", "features" = ["ipc_streaming"]} +polars-sql = {"version" = "0.2.2"} diff --git a/src/io.rs b/src/io.rs index e5d5151..191a307 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,7 +1,8 @@ -use polars::prelude::*; -use std::fs; +use polars_io::prelude::*; +use polars_lazy::prelude::*; use std::io; use std::io::Read; +use std::path::PathBuf; /// Read CSV file pub fn read_csv(path: String) -> LazyFrame { @@ -51,36 +52,28 @@ pub fn write_ipc(df: LazyFrame) { } /// Take a Polars Dataframe and write it as CSV to stdout -pub fn dump_csv_to_stdout(df: &mut DataFrame) { - let _res: () = match CsvWriter::new(io::stdout().lock()).finish(df) { +pub fn dump_csv_to_stdout(ldf: LazyFrame) { + let _res: () = match CsvWriter::new(io::stdout().lock()) + .finish(&mut ldf.collect().expect("Could not collect")) + { Ok(_ok) => (), Err(_e) => (), }; } /// Write a Polars DataFrame to Parquet -pub fn write_parquet( - mut df: DataFrame, - path: String, - compression: String, - statistics: bool, - chunksize: Option, -) { +pub fn write_parquet(ldf: LazyFrame, path: String) { // Selected compression not implemented yet - let mut _file = match fs::File::create(path) { - Ok(mut file) => { - let mut w = ParquetWriter::new(&mut file); - if statistics { - w = w.with_statistics(statistics); - } - if chunksize.unwrap_or(0) > 0 { - w = w.with_row_group_size(chunksize); - } - let _r = match w.finish(&mut df) { - Ok(_r) => (), - Err(e) => eprintln!("{e}"), - }; - } - Err(e) => eprintln!("{e}"), - }; + let mut p = PathBuf::new(); + p.push(path); + ldf.sink_parquet( + p, + ParquetWriteOptions { + compression: ParquetCompression::Snappy, + statistics: true, + row_group_size: None, + data_pagesize_limit: None, + maintain_order: false, + }, + ).expect("Could not save"); } diff --git a/src/main.rs b/src/main.rs index 5353975..5e048d7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -93,16 +93,10 @@ fn main() { println!("{}", df) } else { if _matches.get_flag("text") { - io::dump_csv_to_stdout(&mut ldf.collect().expect("Could not collect")); + io::dump_csv_to_stdout(ldf); } else { if let Some(path) = _matches.get_one::("parquet") { - io::write_parquet( - ldf.collect().expect("Could not collect"), - path.to_string(), - "lz4raw".to_string(), - true, - Some(0), - ); + io::write_parquet(ldf, path.to_string()); } else { io::write_ipc(ldf); } @@ -142,16 +136,10 @@ fn main() { println!("{}", df) } else { if _matches.get_flag("text") { - io::dump_csv_to_stdout(&mut ldf.collect().expect("Could not collect")); + io::dump_csv_to_stdout(ldf); } else { if let Some(path) = _matches.get_one::("parquet") { - io::write_parquet( - ldf.collect().expect("Could not collect"), - path.to_string(), - "lz4raw".to_string(), - true, - Some(0), - ); + io::write_parquet(ldf, path.to_string()); } else { io::write_ipc(ldf); } @@ -162,18 +150,12 @@ fn main() { } } else if let Some(_matches) = matches.subcommand_matches("wpq") { if let Some(path) = _matches.get_one::("path") { - let df = if _matches.get_flag("text") { + let ldf = if _matches.get_flag("text") { io::load_csv_from_stdin() } else { io::read_ipc() }; - io::write_parquet( - df.collect().expect("Could not collect"), - path.to_string(), - "lz4raw".to_string(), - true, - Some(0), - ); + io::write_parquet(ldf, path.to_string()); } else { eprintln!("Could now write to parquet"); } diff --git a/src/sql.rs b/src/sql.rs index 2b90522..087ef2f 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -1,15 +1,10 @@ -use polars::sql::SQLContext; -use polars_lazy::frame::LazyFrame; +use polars_sql::SQLContext; +use polars_lazy::prelude::LazyFrame; pub fn execute(ldf: LazyFrame, statement: &String) -> LazyFrame { let mut context = SQLContext::try_new().expect("Could not create context"); context.register("this", ldf); - - match context.execute(statement) { - Ok(res) => res, - Err(e) => { - eprintln!("Query execution error {e}"); - LazyFrame::default() - } - } + context + .execute(statement) + .expect("Could not execute statement") }