dr/src/io.rs

96 lines
2.7 KiB
Rust
Raw Normal View History

2022-12-26 00:07:50 +01:00
use polars_io::prelude::*;
use polars_lazy::prelude::*;
2022-11-20 01:34:04 +01:00
use std::io;
use std::io::Read;
2022-12-26 00:07:50 +01:00
use std::path::PathBuf;
2022-11-20 01:34:04 +01:00
2022-12-06 15:16:39 +01:00
/// Read CSV file
2023-01-17 15:48:05 +01:00
pub fn read_csv(path: String, delimiter: u8) -> LazyFrame {
2022-12-06 15:16:39 +01:00
LazyCsvReader::new(path)
2023-01-17 15:48:05 +01:00
.with_delimiter(delimiter)
2022-12-06 15:16:39 +01:00
.finish()
.expect("Could not load file")
}
/// Read parquet and return a Polars LazyFrame
pub fn read_parquet(path: String) -> LazyFrame {
LazyFrame::scan_parquet(path, ScanArgsParquet::default()).expect("Could not read parquet file")
}
/// Read IPC setream
pub fn read_ipc() -> LazyFrame {
let mut buffer = Vec::new();
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
2022-11-20 01:34:04 +01:00
Ok(_ok) => (),
Err(_e) => (),
};
2022-12-06 15:16:39 +01:00
let cursor = io::Cursor::new(buffer);
match IpcStreamReader::new(cursor).finish() {
Ok(df) => df.lazy(),
Err(_e) => LazyFrame::default(),
}
}
/// Read CSV format from stdin and return a Polars DataFrame
2023-01-17 15:48:05 +01:00
pub fn load_csv_from_stdin(delimiter: u8) -> LazyFrame {
2022-12-06 15:16:39 +01:00
let mut buffer = Vec::new();
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
Ok(_ok) => (),
Err(_e) => (),
2022-11-20 01:34:04 +01:00
};
2022-12-06 15:16:39 +01:00
let cursor = io::Cursor::new(buffer);
2023-01-17 15:48:05 +01:00
match CsvReader::new(cursor).with_delimiter(delimiter).finish() {
2022-12-06 15:16:39 +01:00
Ok(df) => df.lazy(),
Err(_e) => LazyFrame::default(),
}
}
2022-12-26 13:54:17 +01:00
/// Read CSV format from stdin and return a Polars DataFrame
pub fn load_parquet_from_stdin() -> LazyFrame {
let mut buffer = Vec::new();
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
Ok(_ok) => (),
Err(_e) => (),
};
let cursor = io::Cursor::new(buffer);
match ParquetReader::new(cursor).finish() {
Ok(df) => df.lazy(),
Err(_e) => LazyFrame::default(),
}
}
2022-12-06 15:16:39 +01:00
/// Write to IPC steram
pub fn write_ipc(df: LazyFrame) {
IpcStreamWriter::new(io::stdout().lock())
.finish(&mut df.collect().expect("Could not collect dataframe"))
.expect("Could not write to stream");
2022-11-20 01:34:04 +01:00
}
/// Take a Polars Dataframe and write it as CSV to stdout
2022-12-26 00:07:50 +01:00
pub fn dump_csv_to_stdout(ldf: LazyFrame) {
let _res: () = match CsvWriter::new(io::stdout().lock())
.finish(&mut ldf.collect().expect("Could not collect"))
{
2022-11-20 01:34:04 +01:00
Ok(_ok) => (),
Err(_e) => (),
};
}
/// Write a Polars DataFrame to Parquet
2022-12-26 00:07:50 +01:00
pub fn write_parquet(ldf: LazyFrame, path: String) {
// Selected compression not implemented yet
2022-12-26 00:07:50 +01:00
let mut p = PathBuf::new();
p.push(path);
ldf.sink_parquet(
p,
ParquetWriteOptions {
compression: ParquetCompression::Snappy,
statistics: true,
row_group_size: None,
data_pagesize_limit: None,
maintain_order: false,
},
2022-12-26 13:54:17 +01:00
)
.expect("Could not save");
}