106 lines
3.1 KiB
Rust
106 lines
3.1 KiB
Rust
use polars_io::prelude::*;
|
|
use polars_lazy::prelude::*;
|
|
use std::io;
|
|
use std::io::Read;
|
|
use std::path::PathBuf;
|
|
|
|
/// Read CSV file
|
|
pub fn read_csv(path: String, delimiter: u8) -> LazyFrame {
|
|
LazyCsvReader::new(path)
|
|
.with_delimiter(delimiter)
|
|
.with_infer_schema_length(None)
|
|
.finish()
|
|
.expect("Could not load file")
|
|
}
|
|
|
|
/// Read parquet and return a Polars LazyFrame
|
|
pub fn read_parquet(path: String) -> LazyFrame {
|
|
LazyFrame::scan_parquet(path, ScanArgsParquet::default()).expect("Could not read parquet file")
|
|
}
|
|
|
|
/// Read IPC setream
|
|
pub fn read_ipc() -> LazyFrame {
|
|
let mut buffer = Vec::new();
|
|
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
|
|
Ok(_ok) => (),
|
|
Err(_e) => (),
|
|
};
|
|
let cursor = io::Cursor::new(buffer);
|
|
match IpcStreamReader::new(cursor).finish() {
|
|
Ok(df) => df.lazy(),
|
|
Err(_e) => LazyFrame::default(),
|
|
}
|
|
}
|
|
|
|
/// Read CSV format from stdin and return a Polars DataFrame
|
|
pub fn load_csv_from_stdin(delimiter: u8) -> LazyFrame {
|
|
let mut buffer = Vec::new();
|
|
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
|
|
Ok(_ok) => (),
|
|
Err(_e) => (),
|
|
};
|
|
let cursor = io::Cursor::new(buffer);
|
|
match CsvReader::new(cursor).with_delimiter(delimiter).finish() {
|
|
Ok(df) => df.lazy(),
|
|
Err(_e) => LazyFrame::default(),
|
|
}
|
|
}
|
|
|
|
/// Read CSV format from stdin and return a Polars DataFrame
|
|
pub fn load_parquet_from_stdin() -> LazyFrame {
|
|
let mut buffer = Vec::new();
|
|
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
|
|
Ok(_ok) => (),
|
|
Err(_e) => (),
|
|
};
|
|
let cursor = io::Cursor::new(buffer);
|
|
match ParquetReader::new(cursor).finish() {
|
|
Ok(df) => df.lazy(),
|
|
Err(_e) => LazyFrame::default(),
|
|
}
|
|
}
|
|
|
|
/// Write to IPC steram
|
|
pub fn write_ipc(df: LazyFrame) {
|
|
IpcStreamWriter::new(io::stdout().lock())
|
|
.finish(&mut df.collect().expect("Could not collect dataframe"))
|
|
.expect("Could not write to stream");
|
|
}
|
|
|
|
/// Take a Polars Dataframe and write it as CSV to stdout
|
|
pub fn dump_csv_to_stdout(ldf: LazyFrame) {
|
|
let _res: () = match CsvWriter::new(io::stdout().lock())
|
|
.finish(&mut ldf.collect().expect("Could not collect"))
|
|
{
|
|
Ok(_ok) => (),
|
|
Err(_e) => (),
|
|
};
|
|
}
|
|
|
|
/// Write a Polars DataFrame to Parquet
|
|
/// Not yet supported in standard executor
|
|
pub fn sink_parquet(ldf: LazyFrame, path: String) {
|
|
// Selected compression not implemented yet
|
|
let mut p = PathBuf::new();
|
|
p.push(path);
|
|
ldf.sink_parquet(
|
|
p,
|
|
ParquetWriteOptions {
|
|
compression: ParquetCompression::Snappy,
|
|
statistics: true,
|
|
row_group_size: None,
|
|
data_pagesize_limit: None,
|
|
maintain_order: false,
|
|
},
|
|
)
|
|
.expect("Could not save");
|
|
}
|
|
|
|
pub fn write_parquet(ldf: LazyFrame, path: String) {
|
|
// Selected compression not implemented yet
|
|
let mut file = std::fs::File::create(path).unwrap();
|
|
ParquetWriter::new(&mut file)
|
|
.finish(&mut ldf.collect().expect("Could not collect"))
|
|
.unwrap();
|
|
}
|