Multiple improvements
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful

This commit is contained in:
Guillem Borrell 2022-12-25 23:07:50 +00:00
parent 3af97c71f0
commit 2b18f7b5e3
4 changed files with 36 additions and 64 deletions

View file

@ -1,7 +1,7 @@
[package]
name = "dr"
description = "Command-line data file processing in Rust"
version = "0.5.1"
version = "0.5.2"
edition = "2021"
include = [
"**/*.rs",
@ -14,5 +14,7 @@ repository = "https://git.guillemborrell.es/guillem/dr"
[dependencies]
clap = {version = "4.0", features = ["cargo"]}
polars-lazy = {"version" = "0.25"}
polars = {"version" = "0.25", features = ["sql", "parquet", "decompress", "ipc", "ipc_streaming", "docs-selection"]}
polars-lazy = {"version" = "0.26", "features" = ["parquet", "ipc", "csv-file"]}
polars-core = {"version" = "0.26", "features" = ["describe"]}
polars-io = {"version" = "0.26", "features" = ["ipc_streaming"]}
polars-sql = {"version" = "0.2.2"}

View file

@ -1,7 +1,8 @@
use polars::prelude::*;
use std::fs;
use polars_io::prelude::*;
use polars_lazy::prelude::*;
use std::io;
use std::io::Read;
use std::path::PathBuf;
/// Read CSV file
pub fn read_csv(path: String) -> LazyFrame {
@ -51,36 +52,28 @@ pub fn write_ipc(df: LazyFrame) {
}
/// Take a Polars Dataframe and write it as CSV to stdout
pub fn dump_csv_to_stdout(df: &mut DataFrame) {
let _res: () = match CsvWriter::new(io::stdout().lock()).finish(df) {
pub fn dump_csv_to_stdout(ldf: LazyFrame) {
let _res: () = match CsvWriter::new(io::stdout().lock())
.finish(&mut ldf.collect().expect("Could not collect"))
{
Ok(_ok) => (),
Err(_e) => (),
};
}
/// Write a Polars DataFrame to Parquet
pub fn write_parquet(
mut df: DataFrame,
path: String,
compression: String,
statistics: bool,
chunksize: Option<usize>,
) {
pub fn write_parquet(ldf: LazyFrame, path: String) {
// Selected compression not implemented yet
let mut _file = match fs::File::create(path) {
Ok(mut file) => {
let mut w = ParquetWriter::new(&mut file);
if statistics {
w = w.with_statistics(statistics);
}
if chunksize.unwrap_or(0) > 0 {
w = w.with_row_group_size(chunksize);
}
let _r = match w.finish(&mut df) {
Ok(_r) => (),
Err(e) => eprintln!("{e}"),
};
}
Err(e) => eprintln!("{e}"),
};
let mut p = PathBuf::new();
p.push(path);
ldf.sink_parquet(
p,
ParquetWriteOptions {
compression: ParquetCompression::Snappy,
statistics: true,
row_group_size: None,
data_pagesize_limit: None,
maintain_order: false,
},
).expect("Could not save");
}

View file

@ -93,16 +93,10 @@ fn main() {
println!("{}", df)
} else {
if _matches.get_flag("text") {
io::dump_csv_to_stdout(&mut ldf.collect().expect("Could not collect"));
io::dump_csv_to_stdout(ldf);
} else {
if let Some(path) = _matches.get_one::<String>("parquet") {
io::write_parquet(
ldf.collect().expect("Could not collect"),
path.to_string(),
"lz4raw".to_string(),
true,
Some(0),
);
io::write_parquet(ldf, path.to_string());
} else {
io::write_ipc(ldf);
}
@ -142,16 +136,10 @@ fn main() {
println!("{}", df)
} else {
if _matches.get_flag("text") {
io::dump_csv_to_stdout(&mut ldf.collect().expect("Could not collect"));
io::dump_csv_to_stdout(ldf);
} else {
if let Some(path) = _matches.get_one::<String>("parquet") {
io::write_parquet(
ldf.collect().expect("Could not collect"),
path.to_string(),
"lz4raw".to_string(),
true,
Some(0),
);
io::write_parquet(ldf, path.to_string());
} else {
io::write_ipc(ldf);
}
@ -162,18 +150,12 @@ fn main() {
}
} else if let Some(_matches) = matches.subcommand_matches("wpq") {
if let Some(path) = _matches.get_one::<String>("path") {
let df = if _matches.get_flag("text") {
let ldf = if _matches.get_flag("text") {
io::load_csv_from_stdin()
} else {
io::read_ipc()
};
io::write_parquet(
df.collect().expect("Could not collect"),
path.to_string(),
"lz4raw".to_string(),
true,
Some(0),
);
io::write_parquet(ldf, path.to_string());
} else {
eprintln!("Could now write to parquet");
}

View file

@ -1,15 +1,10 @@
use polars::sql::SQLContext;
use polars_lazy::frame::LazyFrame;
use polars_sql::SQLContext;
use polars_lazy::prelude::LazyFrame;
pub fn execute(ldf: LazyFrame, statement: &String) -> LazyFrame {
let mut context = SQLContext::try_new().expect("Could not create context");
context.register("this", ldf);
match context.execute(statement) {
Ok(res) => res,
Err(e) => {
eprintln!("Query execution error {e}");
LazyFrame::default()
}
}
context
.execute(statement)
.expect("Could not execute statement")
}