Compare commits

..

No commits in common. "main" and "0.6.1" have entirely different histories.
main ... 0.6.1

5 changed files with 262 additions and 372 deletions

View file

@ -1,7 +1,7 @@
[package] [package]
name = "dr" name = "dr"
description = "Command-line data file processing in Rust" description = "Command-line data file processing in Rust"
version = "0.7.0" version = "0.6.1"
edition = "2021" edition = "2021"
include = [ include = [
"**/*.rs", "**/*.rs",
@ -14,8 +14,8 @@ repository = "https://git.guillemborrell.es/guillem/dr"
[dependencies] [dependencies]
clap = {version = "4.0", features = ["cargo"]} clap = {version = "4.0", features = ["cargo"]}
polars-lazy = {"version" = "0.27", "features" = ["parquet", "ipc", "csv-file"]} polars-lazy = {"version" = "0.26", "features" = ["parquet", "ipc", "csv-file"]}
polars-core = {"version" = "0.27", "features" = ["describe", "fmt"]} polars-core = {"version" = "0.26", "features" = ["describe", "fmt"]}
polars-io = {"version" = "0.27", "features" = ["ipc_streaming"]} polars-io = {"version" = "0.26", "features" = ["ipc_streaming"]}
polars-sql = {"version" = "0.2.3"} polars-sql = {"version" = "0.2.2"}
sea-query = {"version" = "0.28"} sea-query = {"version" = "0.28"}

View file

@ -16,36 +16,21 @@ You can install dr the rust way with `cargo install dr` but downloading a binary
``` ```
$ dr --help $ dr --help
dr is a handy command line tool to handle csv and parquet files. Command-line data file processing in Rust
It is designed to integrate nicely with other command line tools
like cat, sed, awk and database clients cli. You can find more
information an a short tutorial https://git.guillemborrell.es/guillem/dr
Usage: dr [COMMAND] Usage: dr [COMMAND]
Commands: Commands:
csv csv Read csv, output arrow stream
Read csv, output arrow stream sql Runs a sql statement on the file
schema print Pretty prints the table
Several table schema related utilities rpq Read parquet file
sql wpq Write to a paquet file
Runs a sql statement on the file help Print this message or the help of the given subcommand(s)
print
Pretty prints the table
rpq
Read parquet file
wpq
Write to a paquet file
help
Print this message or the help of the given subcommand(s)
Options: Options:
-h, --help -h, --help Print help information
Print help information (use `-h` for a summary) -V, --version Print version information
-V, --version
Print version information
``` ```
## Howto ## Howto
@ -151,7 +136,12 @@ ALTER TABLE "wine" ADD COLUMN "OD" real;
ALTER TABLE "wine" ADD COLUMN "Proline" integer; ALTER TABLE "wine" ADD COLUMN "Proline" integer;
``` ```
More about this in the Examples section If you're fine with dr's choices you can then create the table and insert the file
```
$ head wine.csv | dr schema -i -p -n wine | psql
$ tail -n +2 wine.csv | psql -c "\copy wine from stdin with (FORMAT 'csv')"
```
Since most databases can ingest and spit CSV files, some simple operations can be enhanced with dr, like storing the results of a query in a parquet file Since most databases can ingest and spit CSV files, some simple operations can be enhanced with dr, like storing the results of a query in a parquet file
@ -175,41 +165,6 @@ Some commands that convert raw input in ipc format
* Read from stdin in csv and pretty print the table: `dr print -t` * Read from stdin in csv and pretty print the table: `dr print -t`
* Read from stdin in ipc and write the data in parquet: `dr wpq [file.pq]` * Read from stdin in ipc and write the data in parquet: `dr wpq [file.pq]`
Some commands that read csv data from stdin
* Read csv from stdin and print the schema as it would be inserted in a postgresql database: `dr schema -i -p -n tablename`
* Reas csv from stdin and save as parquet, inferring types: `dr csv -i -P filename.pq`
## Examples
### Inserting CSV into postgres
Assume that you were given a large (several GiB) with a weird (latin1) encoding, and you want to insert it into postgres. This dataset may be too large to store it in memory in one go, so you'd like to stream it into the database. You need to
* Read the csv file
* Infer the schema, and create a table
* Change the encoding of the file to the same as the database
You can use `dr` to turn this into a two-step process, and pipe the encoding conversion in one go. The first step would be to infer the schema of the resulting table and creating the table
```
$ head large_csv_file.csv | iconv -f latin1 -t utf-8 | dr schema -i -p -n tablename | pgsql -U username -h hostname database
```
The second step would be leveraging the `pgsql` command to write the contents of the file into the database
```
$ cat large_csv_file.csv | iconv -f latin1 -t UTF-8 | psql -U username -h hostname -c "\copy tablename from stdin with (FORMAT 'csv', HEADER)" database
```
The ingestion process is atomic, meaning that if `pgsql` fails to insert any record, no insertions will be made at all. If the insertion fails, probably because some column of type varchar can't fit the inferred type, you can change the type with:
```
$ psql -U username -h hostname -c 'alter table tablename alter column "LongDescription" type varchar(1024);' database
```
And try inserting again
## Performance ## Performance
This command runs two dr processes. The first one makes an aggregation on a compressed parquet file of 144MB of size, and the second one just orders the result: This command runs two dr processes. The first one makes an aggregation on a compressed parquet file of 144MB of size, and the second one just orders the result:
@ -225,7 +180,7 @@ On a very very old machine (Intel(R) Core(TM) i5-6500T CPU @ 2.50GHz), this take
## Caveats ## Caveats
1. `dr` uses Polars to build and transform dataframes in Rust, and the entire table may be loaded in memory. At the time when `dr` was created, streaming support didn't get along very well with SQL contexts. 1. `dr` uses Polars to build and transform dataframes in Rust, and the entire table has to be loaded in memory. At the time when `dr` was created, streaming support didn't get along very well with SQL contexts.
2. `dr` uses Polars' SQLContext to execute the query which supports a small subset of the SQL language. 2. `dr` uses Polars' SQLContext to execute the query which supports a small subset of the SQL language.

View file

@ -1,123 +0,0 @@
use clap::{arg, ArgAction, Command};
// Generate command line options for the csv command
pub fn gen_csv_command() -> Command {
Command::new("csv")
.about("Read csv, output arrow stream")
.arg(arg!([path] "Path to CSV file"))
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
.arg(
arg!(-i --stdin ... "Read from stdin")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!(-q --query <String> "Execute query on the file").required(false))
.arg(
arg!(-s --summary ... "Summarize the data")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(
arg!(-t --text ... "Output text instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!(-P --parquet <String> "Write output as a parquet file").required(false))
.arg(
arg!(-a --head ... "Print the header of the table")
.required(false)
.action(ArgAction::SetTrue),
)
}
// Generate command line options for the schema command
pub fn gen_schema_command() -> Command {
Command::new("schema")
.about("Several table schema related utilities")
.arg(
arg!(-i --stdin ... "Read from stdin")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
.arg(arg!(-n --name <String> "Table name").required(false))
.arg(arg!(-l --strlen <String> "Default length for string columns").required(false))
.arg(
arg!(-s --summary ... "Summarize the schema")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(
arg!(-p --postgresql ... "Create a postgresql table with schema")
.required(false)
.action(ArgAction::SetTrue),
)
}
// Generate command line options for the sql command
pub fn gen_sql_command() -> Command {
Command::new("sql")
.about("Runs a sql statement on the file")
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
.arg(arg!([statement] "SQL statement"))
.arg(
arg!(-t --text ... "Input text instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!(-d --delimiter <String> "Column delimiter").required(false))
}
// Generate command line options for the rpq command
pub fn gen_rpq_command() -> Command {
Command::new("rpq")
.about("Read parquet file")
.arg(arg!([path] "Path to the parquet file"))
.arg(arg!(-q --query <String> "Execute query on the file").required(false))
.arg(
arg!(-s --summary ... "Summarize the data")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(
arg!(-i --stdin ... "Read from stdin instead than from a file")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(
arg!(-t --text ... "Output text instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!(-P --parquet <String> "Write the result as a parquet file").required(false))
.arg(
arg!(-a --head ... "Print the header of the table")
.required(false)
.action(ArgAction::SetTrue),
)
}
// Generate command line options for the wpq command
pub fn gen_wpq_command() -> Command {
Command::new("wpq")
.about("Write to a paquet file")
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
.arg(
arg!(-t --text ... "Input text instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!([path] "Path to the new parquet file"))
}
// Generate command line options for the print command
pub fn gen_print_command() -> Command {
Command::new("print")
.about("Pretty prints the table")
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
.arg(
arg!(-t --text ... "Inputs csv instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
}

View file

@ -1,150 +0,0 @@
use crate::io;
use crate::schema;
use crate::sql;
use clap::ArgMatches;
use polars_lazy::prelude::LazyFrame;
// Handle csv command
pub fn handle_csv(matches: &ArgMatches) {
let delimiter = match matches.get_one::<String>("delimiter") {
Some(delimiter) => delimiter.as_bytes()[0],
None => b',',
};
let mut ldf = if matches.get_flag("stdin") {
io::load_csv_from_stdin(delimiter)
} else {
let path = matches
.get_one::<String>("path")
.expect("Please, provide a file");
io::read_csv(path.to_string(), delimiter)
};
if let Some(query) = matches.get_one::<String>("query") {
ldf = sql::execute(ldf, query);
}
if matches.get_flag("summary") {
let df = ldf.collect().expect("Could not collect");
println!("{:?}", df.describe(None));
} else if matches.get_flag("head") {
let df = ldf.fetch(5).expect("Could not fetch");
println!("{}", df)
} else {
if matches.get_flag("text") {
io::dump_csv_to_stdout(ldf);
} else {
if let Some(path) = matches.get_one::<String>("parquet") {
io::write_parquet(ldf, path.to_string());
} else {
io::write_ipc(ldf);
}
}
}
}
// Handle the SQL command
pub fn handle_sql(matches: &ArgMatches) {
let delimiter = match matches.get_one::<String>("delimiter") {
Some(delimiter) => delimiter.as_bytes()[0],
None => b',',
};
if let Some(statement) = matches.get_one::<String>("statement") {
let ldf = if matches.get_flag("text") {
io::load_csv_from_stdin(delimiter)
} else {
io::read_ipc()
};
let res = sql::execute(ldf, statement);
io::write_ipc(res);
} else {
io::write_ipc(io::read_ipc());
}
}
// Handle the print command
pub fn handle_print(matches: &ArgMatches) {
let delimiter = match matches.get_one::<String>("delimiter") {
Some(delimiter) => delimiter.as_bytes()[0],
None => b',',
};
let df = if matches.get_flag("text") {
io::load_csv_from_stdin(delimiter)
} else {
io::read_ipc()
};
println!("{}", df.collect().expect("Could not collect"));
}
// Handle the rpq command
pub fn handle_rpq(matches: &ArgMatches) {
let mut ldf = LazyFrame::default();
if matches.get_flag("stdin") {
ldf = io::load_parquet_from_stdin();
} else if let Some(path) = matches.get_one::<String>("path") {
ldf = io::read_parquet(path.to_string());
} else {
eprintln!("File not found or not reading from stdin")
}
if let Some(query) = matches.get_one::<String>("query") {
ldf = sql::execute(ldf, query);
}
if matches.get_flag("summary") {
let df = ldf.collect().expect("Could not collect");
println!("{:?}", df.describe(None));
} else if matches.get_flag("head") {
let df = ldf.fetch(5).expect("Could not fetch");
println!("{}", df)
} else {
if matches.get_flag("text") {
io::dump_csv_to_stdout(ldf);
} else {
if let Some(path) = matches.get_one::<String>("parquet") {
io::write_parquet(ldf, path.to_string());
} else {
io::write_ipc(ldf);
}
}
}
}
// Handle the wpq command
pub fn handle_wpq(matches: &ArgMatches) {
let delimiter = match matches.get_one::<String>("delimiter") {
Some(delimiter) => delimiter.as_bytes()[0],
None => b',',
};
if let Some(path) = matches.get_one::<String>("path") {
let ldf = if matches.get_flag("text") {
io::load_csv_from_stdin(delimiter)
} else {
io::read_ipc()
};
io::write_parquet(ldf, path.to_string());
} else {
eprintln!("Could now write to parquet");
}
}
// Handle the schema command
pub fn handle_schema(matches: &ArgMatches) {
let delimiter = match matches.get_one::<String>("delimiter") {
Some(delimiter) => delimiter.as_bytes()[0],
None => b',',
};
let ldf = if matches.get_flag("stdin") {
io::load_csv_from_stdin(delimiter)
} else {
io::read_ipc()
};
if matches.get_flag("summary") {
schema::print_schema(ldf);
} else if matches.get_flag("postgresql") {
let name = matches
.get_one::<String>("name")
.expect("Please provide a table name");
let strlen: u32 = match matches.get_one::<String>("strlen") {
Some(strlen) => strlen.parse::<u32>().unwrap(),
None => 128,
};
schema::print_create(ldf, name.as_str(), strlen);
}
}

View file

@ -1,44 +1,252 @@
mod commands;
mod handlers;
mod io; mod io;
mod schema; mod schema;
mod sql; mod sql;
use clap::command; use clap::{arg, command, ArgAction, Command};
use polars_lazy::prelude::*;
fn main() { fn main() {
// Commands definition
let matches = command!() let matches = command!()
.author("Guillem Borrell") .subcommand(
.version(env!("CARGO_PKG_VERSION")) Command::new("csv")
.about("dr is a handy command line tool to handle csv an parquet files") .about("Read csv, output arrow stream")
.long_about( .arg(arg!([path] "Path to CSV file"))
"dr is a handy command line tool to handle csv and parquet files. .arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
It is designed to integrate nicely with other command line tools .arg(
like cat, sed, awk and database clients cli. You can find more arg!(-i --stdin ... "Read from stdin")
information an a short tutorial https://git.guillemborrell.es/guillem/dr .required(false)
", .action(ArgAction::SetTrue),
)
.arg(arg!(-q --query <String> "Execute query on the file").required(false))
.arg(
arg!(-s --summary ... "Summarize the data")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(
arg!(-t --text ... "Output text instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!(-P --parquet <String> "Write output as a parquet file").required(false))
.arg(
arg!(-a --head ... "Print the header of the table")
.required(false)
.action(ArgAction::SetTrue),
),
)
.subcommand(
Command::new("schema")
.about("Several table schema related utilities")
.arg(
arg!(-i --stdin ... "Read from stdin")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
.arg(arg!(-n --name <String> "Table name").required(false))
.arg(arg!(-l --strlen <String> "Default length for string columns").required(false))
.arg(
arg!(-s --summary ... "Summarize the schema")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(
arg!(-p --postgresql ... "Create a postgresql table with schema")
.required(false)
.action(ArgAction::SetTrue),
),
)
.subcommand(
Command::new("sql")
.about("Runs a sql statement on the file")
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
.arg(arg!([statement] "SQL statement"))
.arg(
arg!(-t --text ... "Input text instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!(-d --delimiter <String> "Column delimiter").required(false)),
)
.subcommand(
Command::new("print")
.about("Pretty prints the table")
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
.arg(
arg!(-t --text ... "Inputs csv instead of binary")
.required(false)
.action(ArgAction::SetTrue),
),
)
.subcommand(
Command::new("rpq")
.about("Read parquet file")
.arg(arg!([path] "Path to the parquet file"))
.arg(arg!(-q --query <String> "Execute query on the file").required(false))
.arg(
arg!(-s --summary ... "Summarize the data")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(
arg!(-i --stdin ... "Read from stdin instead than from a file")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(
arg!(-t --text ... "Output text instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(
arg!(-P --parquet <String> "Write the result as a parquet file")
.required(false),
)
.arg(
arg!(-a --head ... "Print the header of the table")
.required(false)
.action(ArgAction::SetTrue),
),
)
.subcommand(
Command::new("wpq")
.about("Write to a paquet file")
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
.arg(
arg!(-t --text ... "Input text instead of binary")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(arg!([path] "Path to the new parquet file")),
) )
.subcommand(commands::gen_csv_command())
.subcommand(commands::gen_schema_command())
.subcommand(commands::gen_sql_command())
.subcommand(commands::gen_print_command())
.subcommand(commands::gen_rpq_command())
.subcommand(commands::gen_wpq_command())
.get_matches(); .get_matches();
if let Some(_matches) = matches.subcommand_matches("csv") {
let delimiter = match _matches.get_one::<String>("delimiter") {
Some(delimiter) => delimiter.as_bytes()[0],
None => b',',
};
let mut ldf = if _matches.get_flag("stdin") {
io::load_csv_from_stdin(delimiter)
} else {
let path = _matches
.get_one::<String>("path")
.expect("Please, provide a file");
io::read_csv(path.to_string(), delimiter)
};
if let Some(query) = _matches.get_one::<String>("query") {
ldf = sql::execute(ldf, query);
}
if _matches.get_flag("summary") {
let df = ldf.collect().expect("Could not collect");
println!("{}", df.describe(None));
} else if _matches.get_flag("head") {
let df = ldf.fetch(5).expect("Could not fetch");
println!("{}", df)
} else {
if _matches.get_flag("text") {
io::dump_csv_to_stdout(ldf);
} else {
if let Some(path) = _matches.get_one::<String>("parquet") {
io::write_parquet(ldf, path.to_string());
} else {
io::write_ipc(ldf);
}
}
}
} else if let Some(_matches) = matches.subcommand_matches("sql") {
let delimiter = match _matches.get_one::<String>("delimiter") {
Some(delimiter) => delimiter.as_bytes()[0],
None => b',',
};
if let Some(statement) = _matches.get_one::<String>("statement") {
let ldf = if _matches.get_flag("text") {
io::load_csv_from_stdin(delimiter)
} else {
io::read_ipc()
};
let res = sql::execute(ldf, statement);
io::write_ipc(res);
} else {
io::write_ipc(io::read_ipc());
}
} else if let Some(_matches) = matches.subcommand_matches("print") {
let delimiter = match _matches.get_one::<String>("delimiter") {
Some(delimiter) => delimiter.as_bytes()[0],
None => b',',
};
let df = if _matches.get_flag("text") {
io::load_csv_from_stdin(delimiter)
} else {
io::read_ipc()
};
println!("{}", df.collect().expect("Could not collect"));
} else if let Some(_matches) = matches.subcommand_matches("rpq") {
let mut ldf = LazyFrame::default();
if _matches.get_flag("stdin") {
ldf = io::load_parquet_from_stdin();
} else if let Some(path) = _matches.get_one::<String>("path") {
ldf = io::read_parquet(path.to_string());
} else {
eprintln!("File not found or not reading from stdin")
}
if let Some(query) = _matches.get_one::<String>("query") {
ldf = sql::execute(ldf, query);
}
if _matches.get_flag("summary") {
let df = ldf.collect().expect("Could not collect");
println!("{}", df.describe(None));
} else if _matches.get_flag("head") {
let df = ldf.fetch(5).expect("Could not fetch");
println!("{}", df)
} else {
if _matches.get_flag("text") {
io::dump_csv_to_stdout(ldf);
} else {
if let Some(path) = _matches.get_one::<String>("parquet") {
io::write_parquet(ldf, path.to_string());
} else {
io::write_ipc(ldf);
}
}
}
} else if let Some(_matches) = matches.subcommand_matches("wpq") {
let delimiter = match _matches.get_one::<String>("delimiter") {
Some(delimiter) => delimiter.as_bytes()[0],
None => b',',
};
if let Some(path) = _matches.get_one::<String>("path") {
let ldf = if _matches.get_flag("text") {
io::load_csv_from_stdin(delimiter)
} else {
io::read_ipc()
};
io::write_parquet(ldf, path.to_string());
} else {
eprintln!("Could now write to parquet");
}
} else if let Some(_matches) = matches.subcommand_matches("schema") {
let delimiter = match _matches.get_one::<String>("delimiter") {
Some(delimiter) => delimiter.as_bytes()[0],
None => b',',
};
let ldf = if _matches.get_flag("stdin") {
io::load_csv_from_stdin(delimiter)
} else {
io::read_ipc()
};
// Send the flow to the corresponding handler if _matches.get_flag("summary") {
if let Some(sub_matches) = matches.subcommand_matches("csv") { schema::print_schema(ldf);
handlers::handle_csv(sub_matches); } else if _matches.get_flag("postgresql") {
} else if let Some(sub_matches) = matches.subcommand_matches("sql") { let name = _matches
handlers::handle_sql(sub_matches); .get_one::<String>("name")
} else if let Some(sub_matches) = matches.subcommand_matches("print") { .expect("Please provide a table name");
handlers::handle_print(sub_matches); let strlen: u32 = match _matches.get_one::<String>("strlen") {
} else if let Some(sub_matches) = matches.subcommand_matches("rpq") { Some(strlen) => strlen.parse::<u32>().unwrap(),
handlers::handle_rpq(sub_matches); None => 128,
} else if let Some(sub_matches) = matches.subcommand_matches("wpq") { };
handlers::handle_wpq(sub_matches); schema::print_create(ldf, name.as_str(), strlen);
} else if let Some(sub_matches) = matches.subcommand_matches("schema") { }
handlers::handle_schema(sub_matches);
} else { } else {
println!("No command provided. Please execute dr --help") println!("No command provided. Please execute dr --help")
} }