Implemented functionalities for 0.4
ci/woodpecker/push/woodpecker Pipeline was successful Details
ci/woodpecker/tag/woodpecker Pipeline failed Details

main 0.4
Guillem Borrell 1 year ago
parent 978afec7f4
commit 79a8a82bea

@ -1,7 +1,7 @@
[package]
name = "dr"
description = "Command-line data file processing in Rust"
version = "0.3.1"
version = "0.4"
edition = "2021"
include = [
"**/*.rs",
@ -14,7 +14,4 @@ repository = "https://git.guillemborrell.es/guillem/dr"
[dependencies]
clap = {version = "4.0", features = ["cargo"]}
polars = "0.25"
polars-sql = "0.2.1"
polars-lazy = "0.25"
polars-io = {"version" = "0.25", features = ["parquet"]}
polars = {"version" = "0.25", features = ["sql", "lazy", "parquet", "decompress", "ipc", "ipc_streaming", "docs-selection"]}

@ -9,51 +9,21 @@ You may wonder why I'm implementing this, since there's already [xsv](https://gi
1. This what I'm implementing to learn Rust
2. The Rust data ecosystem has evolved immensely since xsv was sarted. Now we can add things like SQL commands to filter csv files, or translate results to parquet files.
## Example
```bash
$ head wine.csv
Wine,Alcohol,Malic.acid,Ash,Acl,Mg,Phenols,Flavanoids,Nonflavanoid.phenols,Proanth,Color.int,Hue,OD,Proline
1,14.23,1.71,2.43,15.6,127,2.8,3.06,.28,2.29,5.64,1.04,3.92,1065
1,13.2,1.78,2.14,11.2,100,2.65,2.76,.26,1.28,4.38,1.05,3.4,1050
1,13.16,2.36,2.67,18.6,101,2.8,3.24,.3,2.81,5.68,1.03,3.17,1185
1,14.37,1.95,2.5,16.8,113,3.85,3.49,.24,2.18,7.8,.86,3.45,1480
1,13.24,2.59,2.87,21,118,2.8,2.69,.39,1.82,4.32,1.04,2.93,735
1,14.2,1.76,2.45,15.2,112,3.27,3.39,.34,1.97,6.75,1.05,2.85,1450
1,14.39,1.87,2.45,14.6,96,2.5,2.52,.3,1.98,5.25,1.02,3.58,1290
1,14.06,2.15,2.61,17.6,121,2.6,2.51,.31,1.25,5.05,1.06,3.58,1295
1,14.83,1.64,2.17,14,97,2.8,2.98,.29,1.98,5.2,1.08,2.85,1045
$ cat wine.csv | dr sql "select Wine, avg(Alcohol) from this group by Wine" | dr print
shape: (3, 2)
┌──────┬───────────┐
│ Wine ┆ Alcohol │
│ --- ┆ --- │
│ i64 ┆ f64 │
╞══════╪═══════════╡
│ 3 ┆ 13.15375 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 1 ┆ 13.744746 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2 ┆ 12.278732 │
└──────┴───────────┘
```
## Howto
The `dr` command offers a set of subcommands, each one of them with a different functionality. You can get the available subcommands with:
## Help
```bash
```
$ dr --help
Command-line data file processing in Rust
Usage: dr [COMMAND]
Commands:
csv Read csv, output arrow stream
sql Runs a sql statement on the file
print Pretty prints the table
rpq Read parquet file
wpq Write to a parquet file
wpq Write to a paquet file
help Print this message or the help of the given subcommand(s)
Options:
@ -61,11 +31,21 @@ Options:
-V, --version Print version information
```
Subcommands can be pipelined unless reading from a file, writing to a file, or pretty prints data. What goes through the pipeline is a plain-text comma separated values with a header. While this may not be the best choice in terms of performance, allows `dr` subcommands to be combined with the usual unix-style command-line tools like `cat`, `head`, `grep`, `awk` and `sed`:
## Howto
`dr` is convenience command to explore, transform, and analyze csv and parquet files to save you from writing throwaway python scripts or create a custom container image for verys simple tasks. It's designed to make the life of a data engineer a little easier.
Assume you have a very large csv file, and you just want to translate it to parquet with some type inference and sane defaults. With `dr` this is as easy as:
```
$ dr csv wine.csv -P wine.pq
```
Parquet files are binary, and you may want to check that you've not written nonsense by printing the header on your terminal.
```bash
$ cat wine.csv | head -n 5 | dr print
shape: (4, 14)
```
$ dr rpq wine.pq -a
shape: (5, 14)
┌──────┬─────────┬────────────┬──────┬─────┬───────────┬──────┬──────┬─────────┐
│ Wine ┆ Alcohol ┆ Malic.acid ┆ Ash ┆ ... ┆ Color.int ┆ Hue ┆ OD ┆ Proline │
│ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
@ -78,112 +58,91 @@ shape: (4, 14)
│ 1 ┆ 13.16 ┆ 2.36 ┆ 2.67 ┆ ... ┆ 5.68 ┆ 1.03 ┆ 3.17 ┆ 1185 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 1 ┆ 14.37 ┆ 1.95 ┆ 2.5 ┆ ... ┆ 7.8 ┆ 0.86 ┆ 3.45 ┆ 1480 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 1 ┆ 13.24 ┆ 2.59 ┆ 2.87 ┆ ... ┆ 4.32 ┆ 1.04 ┆ 2.93 ┆ 735 │
└──────┴─────────┴────────────┴──────┴─────┴───────────┴──────┴──────┴─────────┘
```
Note that when `dr` loads csv data also tries to guess the data type of each field.
### Parquet
Maybe the most interesing feature of `dr` is the ability to process csv and parquet files using SQL, while solutions like `xsv` and `csvkit` rely on a rich set of subcommands and options. If you already know SQL, there's no need to read any more documentation to select, filter, or group data. The only thing you need to remember is that the table will be called `this`. The following command outputs a csv of the wine with the highest concentration of alcohol in the popular wine dataset:
`dr` is also useful to translate your csv files to parquet with a single command:
```bash
$ cat wine.csv | dr wpq wine.pq
```
Or explore parquet files
```bash
$ dr rpq wine.pq | head -n 5 | dr print
shape: (4, 14)
dr csv wine.csv -q "select * from this where Alcohol = max(Alcohol)" | dr print
shape: (1, 14)
┌──────┬─────────┬────────────┬──────┬─────┬───────────┬──────┬──────┬─────────┐
│ Wine ┆ Alcohol ┆ Malic.acid ┆ Ash ┆ ... ┆ Color.int ┆ Hue ┆ OD ┆ Proline │
│ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ i64 │
╞══════╪═════════╪════════════╪══════╪═════╪═══════════╪══════╪══════╪═════════╡
│ 1 ┆ 14.23 ┆ 1.71 ┆ 2.43 ┆ ... ┆ 5.64 ┆ 1.04 ┆ 3.92 ┆ 1065 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 1 ┆ 13.2 ┆ 1.78 ┆ 2.14 ┆ ... ┆ 4.38 ┆ 1.05 ┆ 3.4 ┆ 1050 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 1 ┆ 13.16 ┆ 2.36 ┆ 2.67 ┆ ... ┆ 5.68 ┆ 1.03 ┆ 3.17 ┆ 1185 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 1 ┆ 14.37 ┆ 1.95 ┆ 2.5 ┆ ... ┆ 7.8 ┆ 0.86 ┆ 3.45 ┆ 1480 │
│ 1 ┆ 14.83 ┆ 1.64 ┆ 2.17 ┆ ... ┆ 5.2 ┆ 1.08 ┆ 2.85 ┆ 1045 │
└──────┴─────────┴────────────┴──────┴─────┴───────────┴──────┴──────┴─────────┘
```
If you don't use any option that formats the output of the results, `dr` outputs Arrow's IPC format, meaning that multiple `dr` calls can be efficiently chained with very low overhead. The following script loads one month of NY taxi data and executes two sql queries on the data.
## Performance
`dr` is implemented in Rust with the goal of achieving the highest possible performance. Take for instance a simple read, groupby, and aggregate operation with ~30MB of data:
```bash
$ time cat data/walmart_train.csv | dr sql "select Dept, avg("Weekly_Sales") from this group by Dept" | dr print
shape: (81, 2)
┌──────┬──────────────┐
│ Dept ┆ Weekly_Sales │
│ --- ┆ --- │
│ i64 ┆ f64 │
╞══════╪══════════════╡
│ 30 ┆ 4118.197208 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 16 ┆ 14245.63827 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 56 ┆ 3833.706211 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 24 ┆ 6353.604562 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ ... ┆ ... │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 31 ┆ 2339.440287 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 59 ┆ 694.463564 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 27 ┆ 1583.437727 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 77 ┆ 328.9618 │
└──────┴──────────────┘
real 0m0.089s
user 0m0.116s
sys 0m0.036s
```
$ dr rpq data/yellow_tripdata_2014-01.parquet \
-q "select count(1) as cnt, passenger_count from this group by passenger_count" \
| dr sql "select * from this order by cnt desc" \
| dr print
┌─────────┬─────────────────┐
│ cnt ┆ passenger_count │
│ --- ┆ --- │
│ u32 ┆ i64 │
╞═════════╪═════════════════╡
│ 9727321 ┆ 1 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 1891588 ┆ 2 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 789070 ┆ 5 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 566248 ┆ 3 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ ... ┆ ... │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 19 ┆ 208 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 16 ┆ 9 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 7 ┆ 7 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 5 ┆ 8 │
└─────────┴─────────────────┘
```
Let's compare that with the followint Python script that leverages Pandas to read the data, and compute the aggregation:
## Reference
```python
#!/usr/bin/env python3
Some commands that generate raw output in ipc format.
import sys
import pandas as pd
* Read a csv or parquet file and print the header: `dr {csv, rpq} [file] -a`
* Read a csv or parquet file, execute a SQL statement, and output the results in stdout using Arrow's ipc format `dr {csv, rpq} [file] -q "statement"`
* Read a csv or parquet file and print a summary of each column: `dr {csv, rpq} [file] -s "[query]"`
* Read a csv or parquet file, execute a query, and output the results in stdout using the csv format `dr {csv, rpq} [file] -s "[query]" -t`
* Read a csv and write a parquet file with the same contents: `dr csv [file.csv] -P [file.pq]`
df = pd.read_csv(sys.stdin)
print(df.groupby("Dept", sort=False, as_index=False).Weekly_Sales.mean())
```
Some commands that convert raw input in ipc format
* Read from stdin in ipc and pretty print the table: `dr print`
* Read from stdin in csv and pretty print the table: `dr print -t`
* Read from stdin in ipc and write the data in parquet: `dr wpq [file.pq]`
## Performance
```bash
$ time cat data/walmart_train.csv | ./python/group.py
Dept Weekly_Sales
0 1 19213.485088
1 2 43607.020113
2 3 11793.698516
3 4 25974.630238
4 5 21365.583515
.. ... ...
76 99 415.487065
77 39 11.123750
78 50 2658.897010
79 43 1.193333
80 65 45441.706224
[81 rows x 2 columns]
real 0m0.717s
user 0m0.627s
sys 0m0.282s
This command runs two dr processes. The first one makes an aggregation on a compressed parquet file of 144MB of size, and the second one just orders the result:
```
$ dr rpq data/yellow_tripdata_2014-01.parquet \
-q "select count(1) as cnt, passenger_count from this group by passenger_count" \
| dr sql "select * from this order by cnt desc" \
> /dev/null
```
Note that there's roughly a 6x speedup. This considering that this operation in particular is heavily optimized in Pandas and most of the run time is spent in parsing and reading from stdin.
On a very very old machine (Intel(R) Core(TM) i5-6500T CPU @ 2.50GHz), this takes around half a second, which is roughly the time needed to read and decompress the parquet file. Polar's csv and parquet readers have some decent performance, so you can count on `dr` to be one of the fastest in the block.
## Caveats
1. `dr` uses Polars to build and transform dataframes in Rust, and the entire table has to be loaded in memory. At the time when `dr` was created, streaming support didn't get along very well with SQL contexts.
2. `dr` uses Polars' SQLContext to execute the query which supports a small subset of the SQL language.
## Built standing on the shoulders of giants

@ -1,7 +0,0 @@
#!/usr/bin/env python3
import sys
import pandas as pd
df = pd.read_csv(sys.stdin)
print(df.groupby("Dept", sort=False, as_index=False).Weekly_Sales.mean())

@ -1,22 +1,53 @@
use polars::frame::DataFrame;
use polars::prelude::*;
use std::fs;
use std::io;
use std::io::Read;
/// Read CSV format from stdin and return a Polars DataFrame
pub fn load_csv_from_stdin() -> DataFrame {
let mut buffer = String::new();
let _res: () = match io::stdin().read_to_string(&mut buffer) {
/// Read CSV file
pub fn read_csv(path: String) -> LazyFrame {
LazyCsvReader::new(path)
.finish()
.expect("Could not load file")
}
/// Read parquet and return a Polars LazyFrame
pub fn read_parquet(path: String) -> LazyFrame {
LazyFrame::scan_parquet(path, ScanArgsParquet::default()).expect("Could not read parquet file")
}
/// Read IPC setream
pub fn read_ipc() -> LazyFrame {
let mut buffer = Vec::new();
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
Ok(_ok) => (),
Err(_e) => (),
};
let cursor = io::Cursor::new(buffer.as_bytes());
let df = match CsvReader::new(cursor).finish() {
Ok(df) => df,
Err(_e) => DataFrame::default(),
let cursor = io::Cursor::new(buffer);
match IpcStreamReader::new(cursor).finish() {
Ok(df) => df.lazy(),
Err(_e) => LazyFrame::default(),
}
}
/// Read CSV format from stdin and return a Polars DataFrame
pub fn load_csv_from_stdin() -> LazyFrame {
let mut buffer = Vec::new();
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
Ok(_ok) => (),
Err(_e) => (),
};
df
let cursor = io::Cursor::new(buffer);
match CsvReader::new(cursor).finish() {
Ok(df) => df.lazy(),
Err(_e) => LazyFrame::default(),
}
}
/// Write to IPC steram
pub fn write_ipc(df: LazyFrame) {
IpcStreamWriter::new(io::stdout().lock())
.finish(&mut df.collect().expect("Could not collect dataframe"))
.expect("Could not write to stream");
}
/// Take a Polars Dataframe and write it as CSV to stdout
@ -27,19 +58,6 @@ pub fn dump_csv_to_stdout(df: &mut DataFrame) {
};
}
/// Read parquet and return a Polars DataFrame
pub fn read_parquet(path: String) -> DataFrame {
let file = fs::File::open(path).expect("Could not open file");
let df = match ParquetReader::new(file).finish() {
Ok(df) => df,
Err(e) => {
eprintln!("{e}");
DataFrame::default()
}
};
df
}
/// Write a Polars DataFrame to Parquet
pub fn write_parquet(
mut df: DataFrame,

@ -1,54 +1,179 @@
mod io;
mod sql;
use clap::{arg, command, Command};
use clap::{arg, command, ArgAction, Command};
fn main() {
let matches = command!()
.subcommand(
Command::new("csv")
.about("Read csv, output arrow stream")
.arg(arg!([path] "Path to CSV file"))
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
.arg(arg!(-q --query <String> "Execute query on the file").required(false))
.arg(
arg!(-s --summary ... "Summarize the data")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(
arg!(-t --text ... "Output text instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!(-P --parquet <String> "Write output as a parquet file").required(false))
.arg(
arg!(-a --head ... "Print the header of the table")
.required(false)
.action(ArgAction::SetTrue),
),
)
.subcommand(
Command::new("sql")
.about("Runs a sql statement on the file")
.arg(arg!([statement] "SQL statement"))
.arg(
arg!(-t --text ... "Input text instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!(-d --delimiter <String> "Column delimiter").required(false)),
)
.subcommand(Command::new("print").about("Pretty prints the table"))
.subcommand(
Command::new("print").about("Pretty prints the table").arg(
arg!(-t --text ... "Inputs csv instead of binary")
.required(false)
.action(ArgAction::SetTrue),
),
)
.subcommand(
Command::new("rpq")
.about("Read parquet file")
.arg(arg!([path] "Path to the parquet file")),
.arg(arg!([path] "Path to the parquet file"))
.arg(arg!(-q --query <String> "Execute query on the file").required(false))
.arg(
arg!(-s --summary ... "Summarize the data")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(
arg!(-t --text ... "Output text instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!(-P --text <String> "Write the result as a parquet file").required(false))
.arg(
arg!(-a --head ... "Print the header of the table")
.required(false)
.action(ArgAction::SetTrue),
),
)
.subcommand(
Command::new("wpq")
.about("Write to a paquet file")
.arg(
arg!(-t --text ... "Output text instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!([path] "Path to the new parquet file")),
)
.get_matches();
if let Some(matches) = matches.subcommand_matches("sql") {
//if let Some(delimiter) = matches.get_one::<String>("delimiter") {
// println!("DEBUG: Delimiter: {delimiter}")
//} else {
// println!("DEBUG: No delimiter")
//}
if let Some(statement) = matches.get_one::<String>("statement") {
sql::execute(statement);
if let Some(_matches) = matches.subcommand_matches("csv") {
if let Some(path) = _matches.get_one::<String>("path") {
let mut ldf = io::read_csv(path.to_string());
if let Some(query) = _matches.get_one::<String>("query") {
ldf = sql::execute(ldf, query);
}
if _matches.get_flag("summary") {
let df = ldf.collect().expect("Could not collect");
println!("{}", df.describe(None));
} else if _matches.get_flag("head") {
let df = ldf.fetch(5).expect("Could not fetch");
println!("{}", df)
} else {
if _matches.get_flag("text") {
io::dump_csv_to_stdout(&mut ldf.collect().expect("Could not collect"));
} else {
if let Some(path) = _matches.get_one::<String>("parquet") {
io::write_parquet(
ldf.collect().expect("Could not collect"),
path.to_string(),
"lz4raw".to_string(),
true,
Some(0),
);
} else {
io::write_ipc(ldf);
}
}
}
}
} else if let Some(_matches) = matches.subcommand_matches("sql") {
if let Some(statement) = _matches.get_one::<String>("statement") {
let ldf = if _matches.get_flag("text") {
io::load_csv_from_stdin()
} else {
io::read_ipc()
};
let res = sql::execute(ldf, statement);
io::write_ipc(res);
} else {
let mut df = io::load_csv_from_stdin();
io::dump_csv_to_stdout(&mut df);
io::write_ipc(io::read_ipc());
}
} else if let Some(_matches) = matches.subcommand_matches("print") {
let df = io::load_csv_from_stdin();
println!("{}", df)
} else if let Some(matches) = matches.subcommand_matches("rpq") {
if let Some(path) = matches.get_one::<String>("path") {
let mut df = io::read_parquet(path.to_string());
io::dump_csv_to_stdout(&mut df);
let df = if _matches.get_flag("text") {
io::load_csv_from_stdin()
} else {
io::read_ipc()
};
println!("{}", df.collect().expect("Could not collect"));
} else if let Some(_matches) = matches.subcommand_matches("rpq") {
if let Some(path) = _matches.get_one::<String>("path") {
let mut ldf = io::read_parquet(path.to_string());
if let Some(query) = _matches.get_one::<String>("query") {
ldf = sql::execute(ldf, query);
}
if _matches.get_flag("summary") {
let df = ldf.collect().expect("Could not collect");
println!("{}", df.describe(None));
} else if _matches.get_flag("head") {
let df = ldf.fetch(5).expect("Could not fetch");
println!("{}", df)
} else {
if _matches.get_flag("text") {
io::dump_csv_to_stdout(&mut ldf.collect().expect("Could not collect"));
} else {
if let Some(path) = _matches.get_one::<String>("parquet") {
io::write_parquet(
ldf.collect().expect("Could not collect"),
path.to_string(),
"lz4raw".to_string(),
true,
Some(0),
);
} else {
io::write_ipc(ldf);
}
}
}
} else {
eprintln!("File not found")
}
} else if let Some(matches) = matches.subcommand_matches("wpq") {
if let Some(path) = matches.get_one::<String>("path") {
let df = io::load_csv_from_stdin();
io::write_parquet(df, path.to_string(), "lz4raw".to_string(), true, Some(0));
} else if let Some(_matches) = matches.subcommand_matches("wpq") {
if let Some(path) = _matches.get_one::<String>("path") {
let df = if _matches.get_flag("text") {
io::load_csv_from_stdin()
} else {
io::read_ipc()
};
io::write_parquet(
df.collect().expect("Could not collect"),
path.to_string(),
"lz4raw".to_string(),
true,
Some(0),
);
} else {
eprintln!("Could now write to parquet");
}

@ -1,19 +1,15 @@
use crate::io::dump_csv_to_stdout;
use crate::io::load_csv_from_stdin;
use polars_lazy::frame::IntoLazy;
use polars_sql::SQLContext;
use polars::prelude::LazyFrame;
use polars::sql::SQLContext;
pub fn execute(statement: &String) {
if let Ok(mut context) = SQLContext::try_new() {
let df = load_csv_from_stdin();
context.register("this", df.lazy());
if let Ok(res) = context.execute(statement) {
if let Ok(mut res) = res.collect() {
dump_csv_to_stdout(&mut res);
};
};
if let Err(e) = context.execute(statement) {
eprintln!("Query execution error {e}")
};
};
pub fn execute(ldf: LazyFrame, statement: &String) -> LazyFrame {
let mut context = SQLContext::try_new().expect("Could not create context");
context.register("this", ldf);
match context.execute(statement) {
Ok(res) => res,
Err(e) => {
eprintln!("Query execution error {e}");
LazyFrame::default()
}
}
}

Loading…
Cancel
Save