dr/src/io.rs
2023-01-18 09:50:02 +00:00

106 lines
3.1 KiB
Rust

use polars_io::prelude::*;
use polars_lazy::prelude::*;
use std::io;
use std::io::Read;
use std::path::PathBuf;
/// Read CSV file
pub fn read_csv(path: String, delimiter: u8) -> LazyFrame {
LazyCsvReader::new(path)
.with_delimiter(delimiter)
.with_infer_schema_length(None)
.finish()
.expect("Could not load file")
}
/// Read parquet and return a Polars LazyFrame
pub fn read_parquet(path: String) -> LazyFrame {
LazyFrame::scan_parquet(path, ScanArgsParquet::default()).expect("Could not read parquet file")
}
/// Read IPC setream
pub fn read_ipc() -> LazyFrame {
let mut buffer = Vec::new();
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
Ok(_ok) => (),
Err(_e) => (),
};
let cursor = io::Cursor::new(buffer);
match IpcStreamReader::new(cursor).finish() {
Ok(df) => df.lazy(),
Err(_e) => LazyFrame::default(),
}
}
/// Read CSV format from stdin and return a Polars DataFrame
pub fn load_csv_from_stdin(delimiter: u8) -> LazyFrame {
let mut buffer = Vec::new();
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
Ok(_ok) => (),
Err(_e) => (),
};
let cursor = io::Cursor::new(buffer);
match CsvReader::new(cursor).with_delimiter(delimiter).finish() {
Ok(df) => df.lazy(),
Err(_e) => LazyFrame::default(),
}
}
/// Read CSV format from stdin and return a Polars DataFrame
pub fn load_parquet_from_stdin() -> LazyFrame {
let mut buffer = Vec::new();
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
Ok(_ok) => (),
Err(_e) => (),
};
let cursor = io::Cursor::new(buffer);
match ParquetReader::new(cursor).finish() {
Ok(df) => df.lazy(),
Err(_e) => LazyFrame::default(),
}
}
/// Write to IPC steram
pub fn write_ipc(df: LazyFrame) {
IpcStreamWriter::new(io::stdout().lock())
.finish(&mut df.collect().expect("Could not collect dataframe"))
.expect("Could not write to stream");
}
/// Take a Polars Dataframe and write it as CSV to stdout
pub fn dump_csv_to_stdout(ldf: LazyFrame) {
let _res: () = match CsvWriter::new(io::stdout().lock())
.finish(&mut ldf.collect().expect("Could not collect"))
{
Ok(_ok) => (),
Err(_e) => (),
};
}
/// Write a Polars DataFrame to Parquet
/// Not yet supported in standard executor
pub fn sink_parquet(ldf: LazyFrame, path: String) {
// Selected compression not implemented yet
let mut p = PathBuf::new();
p.push(path);
ldf.sink_parquet(
p,
ParquetWriteOptions {
compression: ParquetCompression::Snappy,
statistics: true,
row_group_size: None,
data_pagesize_limit: None,
maintain_order: false,
},
)
.expect("Could not save");
}
pub fn write_parquet(ldf: LazyFrame, path: String) {
// Selected compression not implemented yet
let mut file = std::fs::File::create(path).unwrap();
ParquetWriter::new(&mut file)
.finish(&mut ldf.collect().expect("Could not collect"))
.unwrap();
}