Compare commits
26 commits
Author | SHA1 | Date | |
---|---|---|---|
c92d1ad1df | |||
9033f81b98 | |||
Guillem Borrell Nogueras | f4b3a525bb | ||
Guillem Borrell Nogueras | c678cc27f3 | ||
8180def799 | |||
cabc0e7dfe | |||
02c6b50d00 | |||
83a4138f64 | |||
92fec23932 | |||
f0730efcd9 | |||
cbf318690c | |||
edaea203b7 | |||
951bd82a2b | |||
4e94ad295b | |||
d534bdef8d | |||
99d58ff9c3 | |||
facae6af40 | |||
1858777c69 | |||
59adb12078 | |||
1e18c9ae9f | |||
e4e9b71674 | |||
e29b3d18e8 | |||
717da2e1b6 | |||
Guillem Borrell Nogueras | 4c26c4c344 | ||
Guillem Borrell Nogueras | 98a2a983a9 | ||
Guillem Borrell Nogueras | 06a197b07c |
11
Cargo.toml
11
Cargo.toml
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "dr"
|
||||
description = "Command-line data file processing in Rust"
|
||||
version = "0.5.3"
|
||||
version = "0.7.0"
|
||||
edition = "2021"
|
||||
include = [
|
||||
"**/*.rs",
|
||||
|
@ -14,7 +14,8 @@ repository = "https://git.guillemborrell.es/guillem/dr"
|
|||
|
||||
[dependencies]
|
||||
clap = {version = "4.0", features = ["cargo"]}
|
||||
polars-lazy = {"version" = "0.26", "features" = ["parquet", "ipc", "csv-file"]}
|
||||
polars-core = {"version" = "0.26", "features" = ["describe", "fmt"]}
|
||||
polars-io = {"version" = "0.26", "features" = ["ipc_streaming"]}
|
||||
polars-sql = {"version" = "0.2.2"}
|
||||
polars-lazy = {"version" = "0.27", "features" = ["parquet", "ipc", "csv-file"]}
|
||||
polars-core = {"version" = "0.27", "features" = ["describe", "fmt"]}
|
||||
polars-io = {"version" = "0.27", "features" = ["ipc_streaming"]}
|
||||
polars-sql = {"version" = "0.2.3"}
|
||||
sea-query = {"version" = "0.28"}
|
||||
|
|
105
README.md
105
README.md
|
@ -16,21 +16,36 @@ You can install dr the rust way with `cargo install dr` but downloading a binary
|
|||
|
||||
```
|
||||
$ dr --help
|
||||
Command-line data file processing in Rust
|
||||
dr is a handy command line tool to handle csv and parquet files.
|
||||
It is designed to integrate nicely with other command line tools
|
||||
like cat, sed, awk and database clients cli. You can find more
|
||||
information an a short tutorial https://git.guillemborrell.es/guillem/dr
|
||||
|
||||
|
||||
Usage: dr [COMMAND]
|
||||
|
||||
Commands:
|
||||
csv Read csv, output arrow stream
|
||||
sql Runs a sql statement on the file
|
||||
print Pretty prints the table
|
||||
rpq Read parquet file
|
||||
wpq Write to a paquet file
|
||||
help Print this message or the help of the given subcommand(s)
|
||||
csv
|
||||
Read csv, output arrow stream
|
||||
schema
|
||||
Several table schema related utilities
|
||||
sql
|
||||
Runs a sql statement on the file
|
||||
print
|
||||
Pretty prints the table
|
||||
rpq
|
||||
Read parquet file
|
||||
wpq
|
||||
Write to a paquet file
|
||||
help
|
||||
Print this message or the help of the given subcommand(s)
|
||||
|
||||
Options:
|
||||
-h, --help Print help information
|
||||
-V, --version Print version information
|
||||
-h, --help
|
||||
Print help information (use `-h` for a summary)
|
||||
|
||||
-V, --version
|
||||
Print version information
|
||||
```
|
||||
|
||||
## Howto
|
||||
|
@ -111,6 +126,39 @@ $ dr rpq data/yellow_tripdata_2014-01.parquet \
|
|||
└─────────┴─────────────────┘
|
||||
```
|
||||
|
||||
### Operate with SQL databases
|
||||
|
||||
How many times did you have to insert a csv file (sometimes larger than memory) to a database? Tens of times? Hundreds? You've probably used Pandas for that, since it can infer the table's datatypes. So a simple data operation becomes a python script with Pandas and a driver for PostgreSQL as dependencies.
|
||||
|
||||
Now dr can provide the table creation statement with a handful of columns:
|
||||
|
||||
```
|
||||
$ head wine.csv | dr schema -i -p -n wine
|
||||
CREATE TABLE IF NOT EXISTS "wine" ( );
|
||||
ALTER TABLE "wine" ADD COLUMN "Wine" integer;
|
||||
ALTER TABLE "wine" ADD COLUMN "Alcohol" real;
|
||||
ALTER TABLE "wine" ADD COLUMN "Malic.acid" real;
|
||||
ALTER TABLE "wine" ADD COLUMN "Ash" real;
|
||||
ALTER TABLE "wine" ADD COLUMN "Acl" real;
|
||||
ALTER TABLE "wine" ADD COLUMN "Mg" integer;
|
||||
ALTER TABLE "wine" ADD COLUMN "Phenols" real;
|
||||
ALTER TABLE "wine" ADD COLUMN "Flavanoids" real;
|
||||
ALTER TABLE "wine" ADD COLUMN "Nonflavanoid.phenols" real;
|
||||
ALTER TABLE "wine" ADD COLUMN "Proanth" real;
|
||||
ALTER TABLE "wine" ADD COLUMN "Color.int" real;
|
||||
ALTER TABLE "wine" ADD COLUMN "Hue" real;
|
||||
ALTER TABLE "wine" ADD COLUMN "OD" real;
|
||||
ALTER TABLE "wine" ADD COLUMN "Proline" integer;
|
||||
```
|
||||
|
||||
More about this in the Examples section
|
||||
|
||||
Since most databases can ingest and spit CSV files, some simple operations can be enhanced with dr, like storing the results of a query in a parquet file
|
||||
|
||||
```
|
||||
$ psql -c "copy (select * from wine) to stdout with (FORMAT 'csv', HEADER)" | dr csv -i -P wine.pq
|
||||
```
|
||||
|
||||
## Reference
|
||||
|
||||
Some commands that generate raw output in ipc format.
|
||||
|
@ -125,7 +173,42 @@ Some commands that convert raw input in ipc format
|
|||
|
||||
* Read from stdin in ipc and pretty print the table: `dr print`
|
||||
* Read from stdin in csv and pretty print the table: `dr print -t`
|
||||
* Read from stdin in ipc and write the data in parquet: `dr wpq [file.pq]`
|
||||
* Read from stdin in ipc and write the data in parquet: `dr wpq [file.pq]`
|
||||
|
||||
Some commands that read csv data from stdin
|
||||
|
||||
* Read csv from stdin and print the schema as it would be inserted in a postgresql database: `dr schema -i -p -n tablename`
|
||||
* Reas csv from stdin and save as parquet, inferring types: `dr csv -i -P filename.pq`
|
||||
|
||||
## Examples
|
||||
|
||||
### Inserting CSV into postgres
|
||||
|
||||
Assume that you were given a large (several GiB) with a weird (latin1) encoding, and you want to insert it into postgres. This dataset may be too large to store it in memory in one go, so you'd like to stream it into the database. You need to
|
||||
|
||||
* Read the csv file
|
||||
* Infer the schema, and create a table
|
||||
* Change the encoding of the file to the same as the database
|
||||
|
||||
You can use `dr` to turn this into a two-step process, and pipe the encoding conversion in one go. The first step would be to infer the schema of the resulting table and creating the table
|
||||
|
||||
```
|
||||
$ head large_csv_file.csv | iconv -f latin1 -t utf-8 | dr schema -i -p -n tablename | pgsql -U username -h hostname database
|
||||
```
|
||||
|
||||
The second step would be leveraging the `pgsql` command to write the contents of the file into the database
|
||||
|
||||
```
|
||||
$ cat large_csv_file.csv | iconv -f latin1 -t UTF-8 | psql -U username -h hostname -c "\copy tablename from stdin with (FORMAT 'csv', HEADER)" database
|
||||
```
|
||||
|
||||
The ingestion process is atomic, meaning that if `pgsql` fails to insert any record, no insertions will be made at all. If the insertion fails, probably because some column of type varchar can't fit the inferred type, you can change the type with:
|
||||
|
||||
```
|
||||
$ psql -U username -h hostname -c 'alter table tablename alter column "LongDescription" type varchar(1024);' database
|
||||
```
|
||||
|
||||
And try inserting again
|
||||
|
||||
## Performance
|
||||
|
||||
|
@ -142,7 +225,7 @@ On a very very old machine (Intel(R) Core(TM) i5-6500T CPU @ 2.50GHz), this take
|
|||
|
||||
## Caveats
|
||||
|
||||
1. `dr` uses Polars to build and transform dataframes in Rust, and the entire table has to be loaded in memory. At the time when `dr` was created, streaming support didn't get along very well with SQL contexts.
|
||||
1. `dr` uses Polars to build and transform dataframes in Rust, and the entire table may be loaded in memory. At the time when `dr` was created, streaming support didn't get along very well with SQL contexts.
|
||||
|
||||
2. `dr` uses Polars' SQLContext to execute the query which supports a small subset of the SQL language.
|
||||
|
||||
|
|
123
src/commands.rs
Normal file
123
src/commands.rs
Normal file
|
@ -0,0 +1,123 @@
|
|||
use clap::{arg, ArgAction, Command};
|
||||
|
||||
// Generate command line options for the csv command
|
||||
pub fn gen_csv_command() -> Command {
|
||||
Command::new("csv")
|
||||
.about("Read csv, output arrow stream")
|
||||
.arg(arg!([path] "Path to CSV file"))
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
|
||||
.arg(
|
||||
arg!(-i --stdin ... "Read from stdin")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!(-q --query <String> "Execute query on the file").required(false))
|
||||
.arg(
|
||||
arg!(-s --summary ... "Summarize the data")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
arg!(-t --text ... "Output text instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!(-P --parquet <String> "Write output as a parquet file").required(false))
|
||||
.arg(
|
||||
arg!(-a --head ... "Print the header of the table")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
}
|
||||
|
||||
// Generate command line options for the schema command
|
||||
pub fn gen_schema_command() -> Command {
|
||||
Command::new("schema")
|
||||
.about("Several table schema related utilities")
|
||||
.arg(
|
||||
arg!(-i --stdin ... "Read from stdin")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
|
||||
.arg(arg!(-n --name <String> "Table name").required(false))
|
||||
.arg(arg!(-l --strlen <String> "Default length for string columns").required(false))
|
||||
.arg(
|
||||
arg!(-s --summary ... "Summarize the schema")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
arg!(-p --postgresql ... "Create a postgresql table with schema")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
}
|
||||
|
||||
// Generate command line options for the sql command
|
||||
pub fn gen_sql_command() -> Command {
|
||||
Command::new("sql")
|
||||
.about("Runs a sql statement on the file")
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
|
||||
.arg(arg!([statement] "SQL statement"))
|
||||
.arg(
|
||||
arg!(-t --text ... "Input text instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter").required(false))
|
||||
}
|
||||
|
||||
// Generate command line options for the rpq command
|
||||
pub fn gen_rpq_command() -> Command {
|
||||
Command::new("rpq")
|
||||
.about("Read parquet file")
|
||||
.arg(arg!([path] "Path to the parquet file"))
|
||||
.arg(arg!(-q --query <String> "Execute query on the file").required(false))
|
||||
.arg(
|
||||
arg!(-s --summary ... "Summarize the data")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
arg!(-i --stdin ... "Read from stdin instead than from a file")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
arg!(-t --text ... "Output text instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!(-P --parquet <String> "Write the result as a parquet file").required(false))
|
||||
.arg(
|
||||
arg!(-a --head ... "Print the header of the table")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
}
|
||||
|
||||
// Generate command line options for the wpq command
|
||||
pub fn gen_wpq_command() -> Command {
|
||||
Command::new("wpq")
|
||||
.about("Write to a paquet file")
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
|
||||
.arg(
|
||||
arg!(-t --text ... "Input text instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!([path] "Path to the new parquet file"))
|
||||
}
|
||||
|
||||
// Generate command line options for the print command
|
||||
pub fn gen_print_command() -> Command {
|
||||
Command::new("print")
|
||||
.about("Pretty prints the table")
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
|
||||
.arg(
|
||||
arg!(-t --text ... "Inputs csv instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
}
|
150
src/handlers.rs
Normal file
150
src/handlers.rs
Normal file
|
@ -0,0 +1,150 @@
|
|||
use crate::io;
|
||||
use crate::schema;
|
||||
use crate::sql;
|
||||
use clap::ArgMatches;
|
||||
use polars_lazy::prelude::LazyFrame;
|
||||
|
||||
// Handle csv command
|
||||
pub fn handle_csv(matches: &ArgMatches) {
|
||||
let delimiter = match matches.get_one::<String>("delimiter") {
|
||||
Some(delimiter) => delimiter.as_bytes()[0],
|
||||
None => b',',
|
||||
};
|
||||
let mut ldf = if matches.get_flag("stdin") {
|
||||
io::load_csv_from_stdin(delimiter)
|
||||
} else {
|
||||
let path = matches
|
||||
.get_one::<String>("path")
|
||||
.expect("Please, provide a file");
|
||||
io::read_csv(path.to_string(), delimiter)
|
||||
};
|
||||
if let Some(query) = matches.get_one::<String>("query") {
|
||||
ldf = sql::execute(ldf, query);
|
||||
}
|
||||
if matches.get_flag("summary") {
|
||||
let df = ldf.collect().expect("Could not collect");
|
||||
println!("{:?}", df.describe(None));
|
||||
} else if matches.get_flag("head") {
|
||||
let df = ldf.fetch(5).expect("Could not fetch");
|
||||
println!("{}", df)
|
||||
} else {
|
||||
if matches.get_flag("text") {
|
||||
io::dump_csv_to_stdout(ldf);
|
||||
} else {
|
||||
if let Some(path) = matches.get_one::<String>("parquet") {
|
||||
io::write_parquet(ldf, path.to_string());
|
||||
} else {
|
||||
io::write_ipc(ldf);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the SQL command
|
||||
pub fn handle_sql(matches: &ArgMatches) {
|
||||
let delimiter = match matches.get_one::<String>("delimiter") {
|
||||
Some(delimiter) => delimiter.as_bytes()[0],
|
||||
None => b',',
|
||||
};
|
||||
if let Some(statement) = matches.get_one::<String>("statement") {
|
||||
let ldf = if matches.get_flag("text") {
|
||||
io::load_csv_from_stdin(delimiter)
|
||||
} else {
|
||||
io::read_ipc()
|
||||
};
|
||||
let res = sql::execute(ldf, statement);
|
||||
io::write_ipc(res);
|
||||
} else {
|
||||
io::write_ipc(io::read_ipc());
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the print command
|
||||
pub fn handle_print(matches: &ArgMatches) {
|
||||
let delimiter = match matches.get_one::<String>("delimiter") {
|
||||
Some(delimiter) => delimiter.as_bytes()[0],
|
||||
None => b',',
|
||||
};
|
||||
let df = if matches.get_flag("text") {
|
||||
io::load_csv_from_stdin(delimiter)
|
||||
} else {
|
||||
io::read_ipc()
|
||||
};
|
||||
println!("{}", df.collect().expect("Could not collect"));
|
||||
}
|
||||
|
||||
// Handle the rpq command
|
||||
pub fn handle_rpq(matches: &ArgMatches) {
|
||||
let mut ldf = LazyFrame::default();
|
||||
if matches.get_flag("stdin") {
|
||||
ldf = io::load_parquet_from_stdin();
|
||||
} else if let Some(path) = matches.get_one::<String>("path") {
|
||||
ldf = io::read_parquet(path.to_string());
|
||||
} else {
|
||||
eprintln!("File not found or not reading from stdin")
|
||||
}
|
||||
if let Some(query) = matches.get_one::<String>("query") {
|
||||
ldf = sql::execute(ldf, query);
|
||||
}
|
||||
if matches.get_flag("summary") {
|
||||
let df = ldf.collect().expect("Could not collect");
|
||||
println!("{:?}", df.describe(None));
|
||||
} else if matches.get_flag("head") {
|
||||
let df = ldf.fetch(5).expect("Could not fetch");
|
||||
println!("{}", df)
|
||||
} else {
|
||||
if matches.get_flag("text") {
|
||||
io::dump_csv_to_stdout(ldf);
|
||||
} else {
|
||||
if let Some(path) = matches.get_one::<String>("parquet") {
|
||||
io::write_parquet(ldf, path.to_string());
|
||||
} else {
|
||||
io::write_ipc(ldf);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the wpq command
|
||||
pub fn handle_wpq(matches: &ArgMatches) {
|
||||
let delimiter = match matches.get_one::<String>("delimiter") {
|
||||
Some(delimiter) => delimiter.as_bytes()[0],
|
||||
None => b',',
|
||||
};
|
||||
if let Some(path) = matches.get_one::<String>("path") {
|
||||
let ldf = if matches.get_flag("text") {
|
||||
io::load_csv_from_stdin(delimiter)
|
||||
} else {
|
||||
io::read_ipc()
|
||||
};
|
||||
io::write_parquet(ldf, path.to_string());
|
||||
} else {
|
||||
eprintln!("Could now write to parquet");
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the schema command
|
||||
pub fn handle_schema(matches: &ArgMatches) {
|
||||
let delimiter = match matches.get_one::<String>("delimiter") {
|
||||
Some(delimiter) => delimiter.as_bytes()[0],
|
||||
None => b',',
|
||||
};
|
||||
let ldf = if matches.get_flag("stdin") {
|
||||
io::load_csv_from_stdin(delimiter)
|
||||
} else {
|
||||
io::read_ipc()
|
||||
};
|
||||
|
||||
if matches.get_flag("summary") {
|
||||
schema::print_schema(ldf);
|
||||
} else if matches.get_flag("postgresql") {
|
||||
let name = matches
|
||||
.get_one::<String>("name")
|
||||
.expect("Please provide a table name");
|
||||
let strlen: u32 = match matches.get_one::<String>("strlen") {
|
||||
Some(strlen) => strlen.parse::<u32>().unwrap(),
|
||||
None => 128,
|
||||
};
|
||||
schema::print_create(ldf, name.as_str(), strlen);
|
||||
}
|
||||
}
|
19
src/io.rs
19
src/io.rs
|
@ -5,8 +5,10 @@ use std::io::Read;
|
|||
use std::path::PathBuf;
|
||||
|
||||
/// Read CSV file
|
||||
pub fn read_csv(path: String) -> LazyFrame {
|
||||
pub fn read_csv(path: String, delimiter: u8) -> LazyFrame {
|
||||
LazyCsvReader::new(path)
|
||||
.with_delimiter(delimiter)
|
||||
.with_infer_schema_length(None)
|
||||
.finish()
|
||||
.expect("Could not load file")
|
||||
}
|
||||
|
@ -31,14 +33,14 @@ pub fn read_ipc() -> LazyFrame {
|
|||
}
|
||||
|
||||
/// Read CSV format from stdin and return a Polars DataFrame
|
||||
pub fn load_csv_from_stdin() -> LazyFrame {
|
||||
pub fn load_csv_from_stdin(delimiter: u8) -> LazyFrame {
|
||||
let mut buffer = Vec::new();
|
||||
let _res: () = match io::stdin().lock().read_to_end(&mut buffer) {
|
||||
Ok(_ok) => (),
|
||||
Err(_e) => (),
|
||||
};
|
||||
let cursor = io::Cursor::new(buffer);
|
||||
match CsvReader::new(cursor).finish() {
|
||||
match CsvReader::new(cursor).with_delimiter(delimiter).finish() {
|
||||
Ok(df) => df.lazy(),
|
||||
Err(_e) => LazyFrame::default(),
|
||||
}
|
||||
|
@ -76,7 +78,8 @@ pub fn dump_csv_to_stdout(ldf: LazyFrame) {
|
|||
}
|
||||
|
||||
/// Write a Polars DataFrame to Parquet
|
||||
pub fn write_parquet(ldf: LazyFrame, path: String) {
|
||||
/// Not yet supported in standard executor
|
||||
pub fn sink_parquet(ldf: LazyFrame, path: String) {
|
||||
// Selected compression not implemented yet
|
||||
let mut p = PathBuf::new();
|
||||
p.push(path);
|
||||
|
@ -92,3 +95,11 @@ pub fn write_parquet(ldf: LazyFrame, path: String) {
|
|||
)
|
||||
.expect("Could not save");
|
||||
}
|
||||
|
||||
pub fn write_parquet(ldf: LazyFrame, path: String) {
|
||||
// Selected compression not implemented yet
|
||||
let mut file = std::fs::File::create(path).unwrap();
|
||||
ParquetWriter::new(&mut file)
|
||||
.finish(&mut ldf.collect().expect("Could not collect"))
|
||||
.unwrap();
|
||||
}
|
||||
|
|
198
src/main.rs
198
src/main.rs
|
@ -1,176 +1,44 @@
|
|||
mod commands;
|
||||
mod handlers;
|
||||
mod io;
|
||||
mod schema;
|
||||
mod sql;
|
||||
use clap::{arg, command, ArgAction, Command};
|
||||
use polars_lazy::prelude::*;
|
||||
use clap::command;
|
||||
|
||||
fn main() {
|
||||
// Commands definition
|
||||
let matches = command!()
|
||||
.subcommand(
|
||||
Command::new("csv")
|
||||
.about("Read csv, output arrow stream")
|
||||
.arg(arg!([path] "Path to CSV file"))
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
|
||||
.arg(arg!(-q --query <String> "Execute query on the file").required(false))
|
||||
.arg(
|
||||
arg!(-s --summary ... "Summarize the data")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
arg!(-t --text ... "Output text instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!(-P --parquet <String> "Write output as a parquet file").required(false))
|
||||
.arg(
|
||||
arg!(-a --head ... "Print the header of the table")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
),
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("sql")
|
||||
.about("Runs a sql statement on the file")
|
||||
.arg(arg!([statement] "SQL statement"))
|
||||
.arg(
|
||||
arg!(-t --text ... "Input text instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter").required(false)),
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("print").about("Pretty prints the table").arg(
|
||||
arg!(-t --text ... "Inputs csv instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
),
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("rpq")
|
||||
.about("Read parquet file")
|
||||
.arg(arg!([path] "Path to the parquet file"))
|
||||
.arg(arg!(-q --query <String> "Execute query on the file").required(false))
|
||||
.arg(
|
||||
arg!(-s --summary ... "Summarize the data")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
arg!(-i --stdin ... "Read from stdin instead than from a file")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
arg!(-t --text ... "Output text instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
arg!(-P --parquet <String> "Write the result as a parquet file")
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
arg!(-a --head ... "Print the header of the table")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
),
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("wpq")
|
||||
.about("Write to a paquet file")
|
||||
.arg(
|
||||
arg!(-t --text ... "Output text instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!([path] "Path to the new parquet file")),
|
||||
.author("Guillem Borrell")
|
||||
.version(env!("CARGO_PKG_VERSION"))
|
||||
.about("dr is a handy command line tool to handle csv an parquet files")
|
||||
.long_about(
|
||||
"dr is a handy command line tool to handle csv and parquet files.
|
||||
It is designed to integrate nicely with other command line tools
|
||||
like cat, sed, awk and database clients cli. You can find more
|
||||
information an a short tutorial https://git.guillemborrell.es/guillem/dr
|
||||
",
|
||||
)
|
||||
.subcommand(commands::gen_csv_command())
|
||||
.subcommand(commands::gen_schema_command())
|
||||
.subcommand(commands::gen_sql_command())
|
||||
.subcommand(commands::gen_print_command())
|
||||
.subcommand(commands::gen_rpq_command())
|
||||
.subcommand(commands::gen_wpq_command())
|
||||
.get_matches();
|
||||
|
||||
if let Some(_matches) = matches.subcommand_matches("csv") {
|
||||
if let Some(path) = _matches.get_one::<String>("path") {
|
||||
let mut ldf = io::read_csv(path.to_string());
|
||||
if let Some(query) = _matches.get_one::<String>("query") {
|
||||
ldf = sql::execute(ldf, query);
|
||||
}
|
||||
if _matches.get_flag("summary") {
|
||||
let df = ldf.collect().expect("Could not collect");
|
||||
println!("{}", df.describe(None));
|
||||
} else if _matches.get_flag("head") {
|
||||
let df = ldf.fetch(5).expect("Could not fetch");
|
||||
println!("{}", df)
|
||||
} else {
|
||||
if _matches.get_flag("text") {
|
||||
io::dump_csv_to_stdout(ldf);
|
||||
} else {
|
||||
if let Some(path) = _matches.get_one::<String>("parquet") {
|
||||
io::write_parquet(ldf, path.to_string());
|
||||
} else {
|
||||
io::write_ipc(ldf);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if let Some(_matches) = matches.subcommand_matches("sql") {
|
||||
if let Some(statement) = _matches.get_one::<String>("statement") {
|
||||
let ldf = if _matches.get_flag("text") {
|
||||
io::load_csv_from_stdin()
|
||||
} else {
|
||||
io::read_ipc()
|
||||
};
|
||||
let res = sql::execute(ldf, statement);
|
||||
io::write_ipc(res);
|
||||
} else {
|
||||
io::write_ipc(io::read_ipc());
|
||||
}
|
||||
} else if let Some(_matches) = matches.subcommand_matches("print") {
|
||||
let df = if _matches.get_flag("text") {
|
||||
io::load_csv_from_stdin()
|
||||
} else {
|
||||
io::read_ipc()
|
||||
};
|
||||
println!("{}", df.collect().expect("Could not collect"));
|
||||
} else if let Some(_matches) = matches.subcommand_matches("rpq") {
|
||||
let mut ldf = LazyFrame::default();
|
||||
if _matches.get_flag("stdin") {
|
||||
ldf = io::load_parquet_from_stdin();
|
||||
} else if let Some(path) = _matches.get_one::<String>("path") {
|
||||
ldf = io::read_parquet(path.to_string());
|
||||
} else {
|
||||
eprintln!("File not found or not reading from stdin")
|
||||
}
|
||||
if let Some(query) = _matches.get_one::<String>("query") {
|
||||
ldf = sql::execute(ldf, query);
|
||||
}
|
||||
if _matches.get_flag("summary") {
|
||||
let df = ldf.collect().expect("Could not collect");
|
||||
println!("{}", df.describe(None));
|
||||
} else if _matches.get_flag("head") {
|
||||
let df = ldf.fetch(5).expect("Could not fetch");
|
||||
println!("{}", df)
|
||||
} else {
|
||||
if _matches.get_flag("text") {
|
||||
io::dump_csv_to_stdout(ldf);
|
||||
} else {
|
||||
if let Some(path) = _matches.get_one::<String>("parquet") {
|
||||
io::write_parquet(ldf, path.to_string());
|
||||
} else {
|
||||
io::write_ipc(ldf);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if let Some(_matches) = matches.subcommand_matches("wpq") {
|
||||
if let Some(path) = _matches.get_one::<String>("path") {
|
||||
let ldf = if _matches.get_flag("text") {
|
||||
io::load_csv_from_stdin()
|
||||
} else {
|
||||
io::read_ipc()
|
||||
};
|
||||
io::write_parquet(ldf, path.to_string());
|
||||
} else {
|
||||
eprintln!("Could now write to parquet");
|
||||
}
|
||||
// Send the flow to the corresponding handler
|
||||
if let Some(sub_matches) = matches.subcommand_matches("csv") {
|
||||
handlers::handle_csv(sub_matches);
|
||||
} else if let Some(sub_matches) = matches.subcommand_matches("sql") {
|
||||
handlers::handle_sql(sub_matches);
|
||||
} else if let Some(sub_matches) = matches.subcommand_matches("print") {
|
||||
handlers::handle_print(sub_matches);
|
||||
} else if let Some(sub_matches) = matches.subcommand_matches("rpq") {
|
||||
handlers::handle_rpq(sub_matches);
|
||||
} else if let Some(sub_matches) = matches.subcommand_matches("wpq") {
|
||||
handlers::handle_wpq(sub_matches);
|
||||
} else if let Some(sub_matches) = matches.subcommand_matches("schema") {
|
||||
handlers::handle_schema(sub_matches);
|
||||
} else {
|
||||
println!("No command provided. Please execute dr --help")
|
||||
}
|
||||
|
|
58
src/schema.rs
Normal file
58
src/schema.rs
Normal file
|
@ -0,0 +1,58 @@
|
|||
use polars_lazy::prelude::*;
|
||||
use sea_query::table::ColumnType;
|
||||
use sea_query::*;
|
||||
|
||||
pub fn print_schema(ldf: LazyFrame) {
|
||||
let schema = ldf.schema().expect("Could not retreive schema");
|
||||
for f in schema.iter_fields() {
|
||||
let mut unnamed_cols_counter = 0;
|
||||
let d = f.data_type().to_string();
|
||||
let n = if f.name.is_empty() {
|
||||
unnamed_cols_counter += 1;
|
||||
format!("Column{}", unnamed_cols_counter)
|
||||
} else {
|
||||
f.name
|
||||
};
|
||||
|
||||
println!("{n} ({d})");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn print_create(ldf: LazyFrame, table_name: &str, default_strlen: u32) {
|
||||
let schema = ldf.schema().expect("Could not retreive schema");
|
||||
// Create empty table
|
||||
let mut statements = vec![Table::create()
|
||||
.table(Alias::new(table_name))
|
||||
.if_not_exists()
|
||||
.to_string(PostgresQueryBuilder)];
|
||||
|
||||
// Alter table adding fields one by one
|
||||
let mut unnamed_cols_counter = 0;
|
||||
for f in schema.iter_fields() {
|
||||
let dtype = match f.data_type().to_string().as_str() {
|
||||
"i64" => ColumnType::Integer,
|
||||
"f64" => ColumnType::Float,
|
||||
"str" => ColumnType::String(Some(default_strlen)),
|
||||
"bool" => ColumnType::Boolean,
|
||||
&_ => todo!("Datatype {} not supported", f.data_type().to_string()),
|
||||
};
|
||||
|
||||
let name = if f.name.is_empty() {
|
||||
unnamed_cols_counter += 1;
|
||||
format!("Column{}", unnamed_cols_counter)
|
||||
} else {
|
||||
f.name
|
||||
};
|
||||
|
||||
let table = Table::alter()
|
||||
.table(Alias::new(table_name))
|
||||
.add_column(&mut ColumnDef::new_with_type(Alias::new(&name), dtype))
|
||||
.to_owned();
|
||||
statements.push(table.to_string(PostgresQueryBuilder));
|
||||
}
|
||||
|
||||
// Finallyls print all statements
|
||||
for statement in statements {
|
||||
println!("{};", statement);
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue