diff --git a/src/commands.rs b/src/commands.rs new file mode 100644 index 0000000..c3d9b73 --- /dev/null +++ b/src/commands.rs @@ -0,0 +1,117 @@ +use clap::{arg, ArgAction, Command}; + +pub fn gen_csv_command() -> Command { + Command::new("csv") + .about("Read csv, output arrow stream") + .arg(arg!([path] "Path to CSV file")) + .arg(arg!(-d --delimiter "Column delimiter. Assume ,").required(false)) + .arg( + arg!(-i --stdin ... "Read from stdin") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg(arg!(-q --query "Execute query on the file").required(false)) + .arg( + arg!(-s --summary ... "Summarize the data") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg( + arg!(-t --text ... "Output text instead of binary") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg(arg!(-P --parquet "Write output as a parquet file").required(false)) + .arg( + arg!(-a --head ... "Print the header of the table") + .required(false) + .action(ArgAction::SetTrue), + ) +} + +pub fn gen_schema_command() -> Command { + Command::new("schema") + .about("Several table schema related utilities") + .arg( + arg!(-i --stdin ... "Read from stdin") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg(arg!(-d --delimiter "Column delimiter. Assume ,").required(false)) + .arg(arg!(-n --name "Table name").required(false)) + .arg(arg!(-l --strlen "Default length for string columns").required(false)) + .arg( + arg!(-s --summary ... "Summarize the schema") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg( + arg!(-p --postgresql ... "Create a postgresql table with schema") + .required(false) + .action(ArgAction::SetTrue), + ) +} + +pub fn gen_sql_command() -> Command { + Command::new("sql") + .about("Runs a sql statement on the file") + .arg(arg!(-d --delimiter "Column delimiter. Assume ,").required(false)) + .arg(arg!([statement] "SQL statement")) + .arg( + arg!(-t --text ... "Input text instead of binary") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg(arg!(-d --delimiter "Column delimiter").required(false)) +} + +pub fn gen_rpq_command() -> Command { + Command::new("rpq") + .about("Read parquet file") + .arg(arg!([path] "Path to the parquet file")) + .arg(arg!(-q --query "Execute query on the file").required(false)) + .arg( + arg!(-s --summary ... "Summarize the data") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg( + arg!(-i --stdin ... "Read from stdin instead than from a file") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg( + arg!(-t --text ... "Output text instead of binary") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg(arg!(-P --parquet "Write the result as a parquet file").required(false)) + .arg( + arg!(-a --head ... "Print the header of the table") + .required(false) + .action(ArgAction::SetTrue), + ) +} + +pub fn gen_wpq_command() -> Command { + Command::new("wpq") + .about("Write to a paquet file") + .arg(arg!(-d --delimiter "Column delimiter. Assume ,").required(false)) + .arg( + arg!(-t --text ... "Input text instead of binary") + .required(false) + .action(ArgAction::SetTrue), + ) + .arg(arg!([path] "Path to the new parquet file")) +} + +pub fn gen_print_command() -> Command { + Command::new("print") + .about("Pretty prints the table") + .arg(arg!(-d --delimiter "Column delimiter. Assume ,").required(false)) + .arg( + arg!(-t --text ... "Inputs csv instead of binary") + .required(false) + .action(ArgAction::SetTrue), + ) +} diff --git a/src/handlers.rs b/src/handlers.rs new file mode 100644 index 0000000..b360e88 --- /dev/null +++ b/src/handlers.rs @@ -0,0 +1,145 @@ +use crate::io; +use crate::sql; +use crate::schema; +use clap::ArgMatches; +use polars_lazy::prelude::LazyFrame; + +pub fn handle_csv(matches: &ArgMatches) { + let delimiter = match matches.get_one::("delimiter") { + Some(delimiter) => delimiter.as_bytes()[0], + None => b',', + }; + let mut ldf = if matches.get_flag("stdin") { + io::load_csv_from_stdin(delimiter) + } else { + let path = matches + .get_one::("path") + .expect("Please, provide a file"); + io::read_csv(path.to_string(), delimiter) + }; + if let Some(query) = matches.get_one::("query") { + ldf = sql::execute(ldf, query); + } + if matches.get_flag("summary") { + let df = ldf.collect().expect("Could not collect"); + println!("{}", df.describe(None)); + } else if matches.get_flag("head") { + let df = ldf.fetch(5).expect("Could not fetch"); + println!("{}", df) + } else { + if matches.get_flag("text") { + io::dump_csv_to_stdout(ldf); + } else { + if let Some(path) = matches.get_one::("parquet") { + io::write_parquet(ldf, path.to_string()); + } else { + io::write_ipc(ldf); + } + } + } +} + +pub fn handle_sql(matches: &ArgMatches) { + let delimiter = match matches.get_one::("delimiter") { + Some(delimiter) => delimiter.as_bytes()[0], + None => b',', + }; + if let Some(statement) = matches.get_one::("statement") { + let ldf = if matches.get_flag("text") { + io::load_csv_from_stdin(delimiter) + } else { + io::read_ipc() + }; + let res = sql::execute(ldf, statement); + io::write_ipc(res); + } else { + io::write_ipc(io::read_ipc()); + } +} + +pub fn handle_print(matches: &ArgMatches) { + let delimiter = match matches.get_one::("delimiter") { + Some(delimiter) => delimiter.as_bytes()[0], + None => b',', + }; + let df = if matches.get_flag("text") { + io::load_csv_from_stdin(delimiter) + } else { + io::read_ipc() + }; + println!("{}", df.collect().expect("Could not collect")); +} + +pub fn handle_rpq(matches: &ArgMatches) { + let mut ldf = LazyFrame::default(); + if matches.get_flag("stdin") { + ldf = io::load_parquet_from_stdin(); + } else if let Some(path) = matches.get_one::("path") { + ldf = io::read_parquet(path.to_string()); + } else { + eprintln!("File not found or not reading from stdin") + } + if let Some(query) = matches.get_one::("query") { + ldf = sql::execute(ldf, query); + } + if matches.get_flag("summary") { + let df = ldf.collect().expect("Could not collect"); + println!("{}", df.describe(None)); + } else if matches.get_flag("head") { + let df = ldf.fetch(5).expect("Could not fetch"); + println!("{}", df) + } else { + if matches.get_flag("text") { + io::dump_csv_to_stdout(ldf); + } else { + if let Some(path) = matches.get_one::("parquet") { + io::write_parquet(ldf, path.to_string()); + } else { + io::write_ipc(ldf); + } + } + } + +} + +pub fn handle_wpq(matches: &ArgMatches) { + let delimiter = match matches.get_one::("delimiter") { + Some(delimiter) => delimiter.as_bytes()[0], + None => b',', + }; + if let Some(path) = matches.get_one::("path") { + let ldf = if matches.get_flag("text") { + io::load_csv_from_stdin(delimiter) + } else { + io::read_ipc() + }; + io::write_parquet(ldf, path.to_string()); + } else { + eprintln!("Could now write to parquet"); + } +} + +pub fn handle_schema(matches: &ArgMatches) { + let delimiter = match matches.get_one::("delimiter") { + Some(delimiter) => delimiter.as_bytes()[0], + None => b',', + }; + let ldf = if matches.get_flag("stdin") { + io::load_csv_from_stdin(delimiter) + } else { + io::read_ipc() + }; + + if matches.get_flag("summary") { + schema::print_schema(ldf); + } else if matches.get_flag("postgresql") { + let name = matches + .get_one::("name") + .expect("Please provide a table name"); + let strlen: u32 = match matches.get_one::("strlen") { + Some(strlen) => strlen.parse::().unwrap(), + None => 128, + }; + schema::print_create(ldf, name.as_str(), strlen); + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 8019c76..4239702 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,252 +1,31 @@ +mod commands; +mod handlers; mod io; mod schema; mod sql; -use clap::{arg, command, ArgAction, Command}; -use polars_lazy::prelude::*; +use clap::command; fn main() { let matches = command!() - .subcommand( - Command::new("csv") - .about("Read csv, output arrow stream") - .arg(arg!([path] "Path to CSV file")) - .arg(arg!(-d --delimiter "Column delimiter. Assume ,").required(false)) - .arg( - arg!(-i --stdin ... "Read from stdin") - .required(false) - .action(ArgAction::SetTrue), - ) - .arg(arg!(-q --query "Execute query on the file").required(false)) - .arg( - arg!(-s --summary ... "Summarize the data") - .required(false) - .action(ArgAction::SetTrue), - ) - .arg( - arg!(-t --text ... "Output text instead of binary") - .required(false) - .action(ArgAction::SetTrue), - ) - .arg(arg!(-P --parquet "Write output as a parquet file").required(false)) - .arg( - arg!(-a --head ... "Print the header of the table") - .required(false) - .action(ArgAction::SetTrue), - ), - ) - .subcommand( - Command::new("schema") - .about("Several table schema related utilities") - .arg( - arg!(-i --stdin ... "Read from stdin") - .required(false) - .action(ArgAction::SetTrue), - ) - .arg(arg!(-d --delimiter "Column delimiter. Assume ,").required(false)) - .arg(arg!(-n --name "Table name").required(false)) - .arg(arg!(-l --strlen "Default length for string columns").required(false)) - .arg( - arg!(-s --summary ... "Summarize the schema") - .required(false) - .action(ArgAction::SetTrue), - ) - .arg( - arg!(-p --postgresql ... "Create a postgresql table with schema") - .required(false) - .action(ArgAction::SetTrue), - ), - ) - .subcommand( - Command::new("sql") - .about("Runs a sql statement on the file") - .arg(arg!(-d --delimiter "Column delimiter. Assume ,").required(false)) - .arg(arg!([statement] "SQL statement")) - .arg( - arg!(-t --text ... "Input text instead of binary") - .required(false) - .action(ArgAction::SetTrue), - ) - .arg(arg!(-d --delimiter "Column delimiter").required(false)), - ) - .subcommand( - Command::new("print") - .about("Pretty prints the table") - .arg(arg!(-d --delimiter "Column delimiter. Assume ,").required(false)) - .arg( - arg!(-t --text ... "Inputs csv instead of binary") - .required(false) - .action(ArgAction::SetTrue), - ), - ) - .subcommand( - Command::new("rpq") - .about("Read parquet file") - .arg(arg!([path] "Path to the parquet file")) - .arg(arg!(-q --query "Execute query on the file").required(false)) - .arg( - arg!(-s --summary ... "Summarize the data") - .required(false) - .action(ArgAction::SetTrue), - ) - .arg( - arg!(-i --stdin ... "Read from stdin instead than from a file") - .required(false) - .action(ArgAction::SetTrue), - ) - .arg( - arg!(-t --text ... "Output text instead of binary") - .required(false) - .action(ArgAction::SetTrue), - ) - .arg( - arg!(-P --parquet "Write the result as a parquet file") - .required(false), - ) - .arg( - arg!(-a --head ... "Print the header of the table") - .required(false) - .action(ArgAction::SetTrue), - ), - ) - .subcommand( - Command::new("wpq") - .about("Write to a paquet file") - .arg(arg!(-d --delimiter "Column delimiter. Assume ,").required(false)) - .arg( - arg!(-t --text ... "Input text instead of binary") - .required(false) - .action(ArgAction::SetTrue), - ) - .arg(arg!([path] "Path to the new parquet file")), - ) + .subcommand(commands::gen_csv_command()) + .subcommand(commands::gen_schema_command()) + .subcommand(commands::gen_sql_command()) + .subcommand(commands::gen_print_command()) + .subcommand(commands::gen_rpq_command()) + .subcommand(commands::gen_wpq_command()) .get_matches(); - if let Some(_matches) = matches.subcommand_matches("csv") { - let delimiter = match _matches.get_one::("delimiter") { - Some(delimiter) => delimiter.as_bytes()[0], - None => b',', - }; - let mut ldf = if _matches.get_flag("stdin") { - io::load_csv_from_stdin(delimiter) - } else { - let path = _matches - .get_one::("path") - .expect("Please, provide a file"); - io::read_csv(path.to_string(), delimiter) - }; - if let Some(query) = _matches.get_one::("query") { - ldf = sql::execute(ldf, query); - } - if _matches.get_flag("summary") { - let df = ldf.collect().expect("Could not collect"); - println!("{}", df.describe(None)); - } else if _matches.get_flag("head") { - let df = ldf.fetch(5).expect("Could not fetch"); - println!("{}", df) - } else { - if _matches.get_flag("text") { - io::dump_csv_to_stdout(ldf); - } else { - if let Some(path) = _matches.get_one::("parquet") { - io::write_parquet(ldf, path.to_string()); - } else { - io::write_ipc(ldf); - } - } - } - } else if let Some(_matches) = matches.subcommand_matches("sql") { - let delimiter = match _matches.get_one::("delimiter") { - Some(delimiter) => delimiter.as_bytes()[0], - None => b',', - }; - if let Some(statement) = _matches.get_one::("statement") { - let ldf = if _matches.get_flag("text") { - io::load_csv_from_stdin(delimiter) - } else { - io::read_ipc() - }; - let res = sql::execute(ldf, statement); - io::write_ipc(res); - } else { - io::write_ipc(io::read_ipc()); - } - } else if let Some(_matches) = matches.subcommand_matches("print") { - let delimiter = match _matches.get_one::("delimiter") { - Some(delimiter) => delimiter.as_bytes()[0], - None => b',', - }; - let df = if _matches.get_flag("text") { - io::load_csv_from_stdin(delimiter) - } else { - io::read_ipc() - }; - println!("{}", df.collect().expect("Could not collect")); - } else if let Some(_matches) = matches.subcommand_matches("rpq") { - let mut ldf = LazyFrame::default(); - if _matches.get_flag("stdin") { - ldf = io::load_parquet_from_stdin(); - } else if let Some(path) = _matches.get_one::("path") { - ldf = io::read_parquet(path.to_string()); - } else { - eprintln!("File not found or not reading from stdin") - } - if let Some(query) = _matches.get_one::("query") { - ldf = sql::execute(ldf, query); - } - if _matches.get_flag("summary") { - let df = ldf.collect().expect("Could not collect"); - println!("{}", df.describe(None)); - } else if _matches.get_flag("head") { - let df = ldf.fetch(5).expect("Could not fetch"); - println!("{}", df) - } else { - if _matches.get_flag("text") { - io::dump_csv_to_stdout(ldf); - } else { - if let Some(path) = _matches.get_one::("parquet") { - io::write_parquet(ldf, path.to_string()); - } else { - io::write_ipc(ldf); - } - } - } - } else if let Some(_matches) = matches.subcommand_matches("wpq") { - let delimiter = match _matches.get_one::("delimiter") { - Some(delimiter) => delimiter.as_bytes()[0], - None => b',', - }; - if let Some(path) = _matches.get_one::("path") { - let ldf = if _matches.get_flag("text") { - io::load_csv_from_stdin(delimiter) - } else { - io::read_ipc() - }; - io::write_parquet(ldf, path.to_string()); - } else { - eprintln!("Could now write to parquet"); - } - } else if let Some(_matches) = matches.subcommand_matches("schema") { - let delimiter = match _matches.get_one::("delimiter") { - Some(delimiter) => delimiter.as_bytes()[0], - None => b',', - }; - let ldf = if _matches.get_flag("stdin") { - io::load_csv_from_stdin(delimiter) - } else { - io::read_ipc() - }; - - if _matches.get_flag("summary") { - schema::print_schema(ldf); - } else if _matches.get_flag("postgresql") { - let name = _matches - .get_one::("name") - .expect("Please provide a table name"); - let strlen: u32 = match _matches.get_one::("strlen") { - Some(strlen) => strlen.parse::().unwrap(), - None => 128, - }; - schema::print_create(ldf, name.as_str(), strlen); - } + if let Some(sub_matches) = matches.subcommand_matches("csv") { + handlers::handle_csv(sub_matches); + } else if let Some(sub_matches) = matches.subcommand_matches("sql") { + handlers::handle_sql(sub_matches); + } else if let Some(sub_matches) = matches.subcommand_matches("print") { + handlers::handle_print(sub_matches); + } else if let Some(sub_matches) = matches.subcommand_matches("rpq") { + handlers::handle_rpq(sub_matches); + } else if let Some(sub_matches) = matches.subcommand_matches("wpq") { + handlers::handle_wpq(sub_matches); + } else if let Some(sub_matches) = matches.subcommand_matches("schema") { + handlers::handle_schema(sub_matches); } else { println!("No command provided. Please execute dr --help") }