Bring back parquet from stdin. Fixes.
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful

This commit is contained in:
Guillem Borrell 2022-12-26 12:54:17 +00:00
parent 2b18f7b5e3
commit fc063601c5
3 changed files with 51 additions and 24 deletions

View file

@ -1,7 +1,7 @@
[package] [package]
name = "dr" name = "dr"
description = "Command-line data file processing in Rust" description = "Command-line data file processing in Rust"
version = "0.5.2" version = "0.5.3"
edition = "2021" edition = "2021"
include = [ include = [
"**/*.rs", "**/*.rs",
@ -15,6 +15,6 @@ repository = "https://git.guillemborrell.es/guillem/dr"
[dependencies] [dependencies]
clap = {version = "4.0", features = ["cargo"]} clap = {version = "4.0", features = ["cargo"]}
polars-lazy = {"version" = "0.26", "features" = ["parquet", "ipc", "csv-file"]} polars-lazy = {"version" = "0.26", "features" = ["parquet", "ipc", "csv-file"]}
polars-core = {"version" = "0.26", "features" = ["describe"]} polars-core = {"version" = "0.26", "features" = ["describe", "fmt"]}
polars-io = {"version" = "0.26", "features" = ["ipc_streaming"]} polars-io = {"version" = "0.26", "features" = ["ipc_streaming"]}
polars-sql = {"version" = "0.2.2"} polars-sql = {"version" = "0.2.2"}

View file

@ -44,6 +44,20 @@ pub fn load_csv_from_stdin() -> LazyFrame {
} }
} }
/// Read CSV format from stdin and return a Polars DataFrame
pub fn load_parquet_from_stdin() -> LazyFrame {
let mut buffer = Vec::new();
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
Ok(_ok) => (),
Err(_e) => (),
};
let cursor = io::Cursor::new(buffer);
match ParquetReader::new(cursor).finish() {
Ok(df) => df.lazy(),
Err(_e) => LazyFrame::default(),
}
}
/// Write to IPC steram /// Write to IPC steram
pub fn write_ipc(df: LazyFrame) { pub fn write_ipc(df: LazyFrame) {
IpcStreamWriter::new(io::stdout().lock()) IpcStreamWriter::new(io::stdout().lock())
@ -75,5 +89,6 @@ pub fn write_parquet(ldf: LazyFrame, path: String) {
data_pagesize_limit: None, data_pagesize_limit: None,
maintain_order: false, maintain_order: false,
}, },
).expect("Could not save"); )
.expect("Could not save");
} }

View file

@ -1,6 +1,7 @@
mod io; mod io;
mod sql; mod sql;
use clap::{arg, command, ArgAction, Command}; use clap::{arg, command, ArgAction, Command};
use polars_lazy::prelude::*;
fn main() { fn main() {
let matches = command!() let matches = command!()
@ -55,12 +56,20 @@ fn main() {
.required(false) .required(false)
.action(ArgAction::SetTrue), .action(ArgAction::SetTrue),
) )
.arg(
arg!(-i --stdin ... "Read from stdin instead than from a file")
.required(false)
.action(ArgAction::SetTrue),
)
.arg( .arg(
arg!(-t --text ... "Output text instead of binary") arg!(-t --text ... "Output text instead of binary")
.required(false) .required(false)
.action(ArgAction::SetTrue), .action(ArgAction::SetTrue),
) )
.arg(arg!(-P --text <String> "Write the result as a parquet file").required(false)) .arg(
arg!(-P --parquet <String> "Write the result as a parquet file")
.required(false),
)
.arg( .arg(
arg!(-a --head ... "Print the header of the table") arg!(-a --head ... "Print the header of the table")
.required(false) .required(false)
@ -123,30 +132,33 @@ fn main() {
}; };
println!("{}", df.collect().expect("Could not collect")); println!("{}", df.collect().expect("Could not collect"));
} else if let Some(_matches) = matches.subcommand_matches("rpq") { } else if let Some(_matches) = matches.subcommand_matches("rpq") {
if let Some(path) = _matches.get_one::<String>("path") { let mut ldf = LazyFrame::default();
let mut ldf = io::read_parquet(path.to_string()); if _matches.get_flag("stdin") {
if let Some(query) = _matches.get_one::<String>("query") { ldf = io::load_parquet_from_stdin();
ldf = sql::execute(ldf, query); } else if let Some(path) = _matches.get_one::<String>("path") {
} ldf = io::read_parquet(path.to_string());
if _matches.get_flag("summary") { } else {
let df = ldf.collect().expect("Could not collect"); eprintln!("File not found or not reading from stdin")
println!("{}", df.describe(None)); }
} else if _matches.get_flag("head") { if let Some(query) = _matches.get_one::<String>("query") {
let df = ldf.fetch(5).expect("Could not fetch"); ldf = sql::execute(ldf, query);
println!("{}", df) }
if _matches.get_flag("summary") {
let df = ldf.collect().expect("Could not collect");
println!("{}", df.describe(None));
} else if _matches.get_flag("head") {
let df = ldf.fetch(5).expect("Could not fetch");
println!("{}", df)
} else {
if _matches.get_flag("text") {
io::dump_csv_to_stdout(ldf);
} else { } else {
if _matches.get_flag("text") { if let Some(path) = _matches.get_one::<String>("parquet") {
io::dump_csv_to_stdout(ldf); io::write_parquet(ldf, path.to_string());
} else { } else {
if let Some(path) = _matches.get_one::<String>("parquet") { io::write_ipc(ldf);
io::write_parquet(ldf, path.to_string());
} else {
io::write_ipc(ldf);
}
} }
} }
} else {
eprintln!("File not found")
} }
} else if let Some(_matches) = matches.subcommand_matches("wpq") { } else if let Some(_matches) = matches.subcommand_matches("wpq") {
if let Some(path) = _matches.get_one::<String>("path") { if let Some(path) = _matches.get_one::<String>("path") {