Compare commits
30 Commits
Author | SHA1 | Date |
---|---|---|
Guillem Borrell | c92d1ad1df | 11 months ago |
Guillem Borrell | 9033f81b98 | 1 year ago |
Guillem Borrell Nogueras | f4b3a525bb | 1 year ago |
Guillem Borrell Nogueras | c678cc27f3 | 1 year ago |
Guillem Borrell | 8180def799 | 1 year ago |
Guillem Borrell | cabc0e7dfe | 1 year ago |
Guillem Borrell | 02c6b50d00 | 1 year ago |
Guillem Borrell | 83a4138f64 | 1 year ago |
Guillem Borrell | 92fec23932 | 1 year ago |
Guillem Borrell | f0730efcd9 | 1 year ago |
Guillem Borrell | cbf318690c | 1 year ago |
Guillem Borrell | edaea203b7 | 1 year ago |
Guillem Borrell | 951bd82a2b | 1 year ago |
Guillem Borrell | 4e94ad295b | 1 year ago |
Guillem Borrell | d534bdef8d | 1 year ago |
Guillem Borrell | 99d58ff9c3 | 1 year ago |
Guillem Borrell | facae6af40 | 1 year ago |
Guillem Borrell | 1858777c69 | 1 year ago |
Guillem Borrell | 59adb12078 | 1 year ago |
Guillem Borrell | 1e18c9ae9f | 1 year ago |
Guillem Borrell | e4e9b71674 | 1 year ago |
Guillem Borrell | e29b3d18e8 | 1 year ago |
Guillem Borrell | 717da2e1b6 | 1 year ago |
Guillem Borrell Nogueras | 4c26c4c344 | 1 year ago |
Guillem Borrell Nogueras | 98a2a983a9 | 1 year ago |
Guillem Borrell Nogueras | 06a197b07c | 1 year ago |
Guillem Borrell | fc063601c5 | 1 year ago |
Guillem Borrell | 2b18f7b5e3 | 1 year ago |
Guillem Borrell | 3af97c71f0 | 1 year ago |
Guillem Borrell | ada122e5c3 | 1 year ago |
@ -0,0 +1,123 @@
|
||||
use clap::{arg, ArgAction, Command};
|
||||
|
||||
// Generate command line options for the csv command
|
||||
pub fn gen_csv_command() -> Command {
|
||||
Command::new("csv")
|
||||
.about("Read csv, output arrow stream")
|
||||
.arg(arg!([path] "Path to CSV file"))
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
|
||||
.arg(
|
||||
arg!(-i --stdin ... "Read from stdin")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!(-q --query <String> "Execute query on the file").required(false))
|
||||
.arg(
|
||||
arg!(-s --summary ... "Summarize the data")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
arg!(-t --text ... "Output text instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!(-P --parquet <String> "Write output as a parquet file").required(false))
|
||||
.arg(
|
||||
arg!(-a --head ... "Print the header of the table")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
}
|
||||
|
||||
// Generate command line options for the schema command
|
||||
pub fn gen_schema_command() -> Command {
|
||||
Command::new("schema")
|
||||
.about("Several table schema related utilities")
|
||||
.arg(
|
||||
arg!(-i --stdin ... "Read from stdin")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
|
||||
.arg(arg!(-n --name <String> "Table name").required(false))
|
||||
.arg(arg!(-l --strlen <String> "Default length for string columns").required(false))
|
||||
.arg(
|
||||
arg!(-s --summary ... "Summarize the schema")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
arg!(-p --postgresql ... "Create a postgresql table with schema")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
}
|
||||
|
||||
// Generate command line options for the sql command
|
||||
pub fn gen_sql_command() -> Command {
|
||||
Command::new("sql")
|
||||
.about("Runs a sql statement on the file")
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
|
||||
.arg(arg!([statement] "SQL statement"))
|
||||
.arg(
|
||||
arg!(-t --text ... "Input text instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter").required(false))
|
||||
}
|
||||
|
||||
// Generate command line options for the rpq command
|
||||
pub fn gen_rpq_command() -> Command {
|
||||
Command::new("rpq")
|
||||
.about("Read parquet file")
|
||||
.arg(arg!([path] "Path to the parquet file"))
|
||||
.arg(arg!(-q --query <String> "Execute query on the file").required(false))
|
||||
.arg(
|
||||
arg!(-s --summary ... "Summarize the data")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
arg!(-i --stdin ... "Read from stdin instead than from a file")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
arg!(-t --text ... "Output text instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!(-P --parquet <String> "Write the result as a parquet file").required(false))
|
||||
.arg(
|
||||
arg!(-a --head ... "Print the header of the table")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
}
|
||||
|
||||
// Generate command line options for the wpq command
|
||||
pub fn gen_wpq_command() -> Command {
|
||||
Command::new("wpq")
|
||||
.about("Write to a paquet file")
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
|
||||
.arg(
|
||||
arg!(-t --text ... "Input text instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
.arg(arg!([path] "Path to the new parquet file"))
|
||||
}
|
||||
|
||||
// Generate command line options for the print command
|
||||
pub fn gen_print_command() -> Command {
|
||||
Command::new("print")
|
||||
.about("Pretty prints the table")
|
||||
.arg(arg!(-d --delimiter <String> "Column delimiter. Assume ,").required(false))
|
||||
.arg(
|
||||
arg!(-t --text ... "Inputs csv instead of binary")
|
||||
.required(false)
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
}
|
@ -0,0 +1,150 @@
|
||||
use crate::io;
|
||||
use crate::schema;
|
||||
use crate::sql;
|
||||
use clap::ArgMatches;
|
||||
use polars_lazy::prelude::LazyFrame;
|
||||
|
||||
// Handle csv command
|
||||
pub fn handle_csv(matches: &ArgMatches) {
|
||||
let delimiter = match matches.get_one::<String>("delimiter") {
|
||||
Some(delimiter) => delimiter.as_bytes()[0],
|
||||
None => b',',
|
||||
};
|
||||
let mut ldf = if matches.get_flag("stdin") {
|
||||
io::load_csv_from_stdin(delimiter)
|
||||
} else {
|
||||
let path = matches
|
||||
.get_one::<String>("path")
|
||||
.expect("Please, provide a file");
|
||||
io::read_csv(path.to_string(), delimiter)
|
||||
};
|
||||
if let Some(query) = matches.get_one::<String>("query") {
|
||||
ldf = sql::execute(ldf, query);
|
||||
}
|
||||
if matches.get_flag("summary") {
|
||||
let df = ldf.collect().expect("Could not collect");
|
||||
println!("{:?}", df.describe(None));
|
||||
} else if matches.get_flag("head") {
|
||||
let df = ldf.fetch(5).expect("Could not fetch");
|
||||
println!("{}", df)
|
||||
} else {
|
||||
if matches.get_flag("text") {
|
||||
io::dump_csv_to_stdout(ldf);
|
||||
} else {
|
||||
if let Some(path) = matches.get_one::<String>("parquet") {
|
||||
io::write_parquet(ldf, path.to_string());
|
||||
} else {
|
||||
io::write_ipc(ldf);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the SQL command
|
||||
pub fn handle_sql(matches: &ArgMatches) {
|
||||
let delimiter = match matches.get_one::<String>("delimiter") {
|
||||
Some(delimiter) => delimiter.as_bytes()[0],
|
||||
None => b',',
|
||||
};
|
||||
if let Some(statement) = matches.get_one::<String>("statement") {
|
||||
let ldf = if matches.get_flag("text") {
|
||||
io::load_csv_from_stdin(delimiter)
|
||||
} else {
|
||||
io::read_ipc()
|
||||
};
|
||||
let res = sql::execute(ldf, statement);
|
||||
io::write_ipc(res);
|
||||
} else {
|
||||
io::write_ipc(io::read_ipc());
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the print command
|
||||
pub fn handle_print(matches: &ArgMatches) {
|
||||
let delimiter = match matches.get_one::<String>("delimiter") {
|
||||
Some(delimiter) => delimiter.as_bytes()[0],
|
||||
None => b',',
|
||||
};
|
||||
let df = if matches.get_flag("text") {
|
||||
io::load_csv_from_stdin(delimiter)
|
||||
} else {
|
||||
io::read_ipc()
|
||||
};
|
||||
println!("{}", df.collect().expect("Could not collect"));
|
||||
}
|
||||
|
||||
// Handle the rpq command
|
||||
pub fn handle_rpq(matches: &ArgMatches) {
|
||||
let mut ldf = LazyFrame::default();
|
||||
if matches.get_flag("stdin") {
|
||||
ldf = io::load_parquet_from_stdin();
|
||||
} else if let Some(path) = matches.get_one::<String>("path") {
|
||||
ldf = io::read_parquet(path.to_string());
|
||||
} else {
|
||||
eprintln!("File not found or not reading from stdin")
|
||||
}
|
||||
if let Some(query) = matches.get_one::<String>("query") {
|
||||
ldf = sql::execute(ldf, query);
|
||||
}
|
||||
if matches.get_flag("summary") {
|
||||
let df = ldf.collect().expect("Could not collect");
|
||||
println!("{:?}", df.describe(None));
|
||||
} else if matches.get_flag("head") {
|
||||
let df = ldf.fetch(5).expect("Could not fetch");
|
||||
println!("{}", df)
|
||||
} else {
|
||||
if matches.get_flag("text") {
|
||||
io::dump_csv_to_stdout(ldf);
|
||||
} else {
|
||||
if let Some(path) = matches.get_one::<String>("parquet") {
|
||||
io::write_parquet(ldf, path.to_string());
|
||||
} else {
|
||||
io::write_ipc(ldf);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the wpq command
|
||||
pub fn handle_wpq(matches: &ArgMatches) {
|
||||
let delimiter = match matches.get_one::<String>("delimiter") {
|
||||
Some(delimiter) => delimiter.as_bytes()[0],
|
||||
None => b',',
|
||||
};
|
||||
if let Some(path) = matches.get_one::<String>("path") {
|
||||
let ldf = if matches.get_flag("text") {
|
||||
io::load_csv_from_stdin(delimiter)
|
||||
} else {
|
||||
io::read_ipc()
|
||||
};
|
||||
io::write_parquet(ldf, path.to_string());
|
||||
} else {
|
||||
eprintln!("Could now write to parquet");
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the schema command
|
||||
pub fn handle_schema(matches: &ArgMatches) {
|
||||
let delimiter = match matches.get_one::<String>("delimiter") {
|
||||
Some(delimiter) => delimiter.as_bytes()[0],
|
||||
None => b',',
|
||||
};
|
||||
let ldf = if matches.get_flag("stdin") {
|
||||
io::load_csv_from_stdin(delimiter)
|
||||
} else {
|
||||
io::read_ipc()
|
||||
};
|
||||
|
||||
if matches.get_flag("summary") {
|
||||
schema::print_schema(ldf);
|
||||
} else if matches.get_flag("postgresql") {
|
||||
let name = matches
|
||||
.get_one::<String>("name")
|
||||
.expect("Please provide a table name");
|
||||
let strlen: u32 = match matches.get_one::<String>("strlen") {
|
||||
Some(strlen) => strlen.parse::<u32>().unwrap(),
|
||||
None => 128,
|
||||
};
|
||||
schema::print_create(ldf, name.as_str(), strlen);
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
use polars_lazy::prelude::*;
|
||||
use sea_query::table::ColumnType;
|
||||
use sea_query::*;
|
||||
|
||||
pub fn print_schema(ldf: LazyFrame) {
|
||||
let schema = ldf.schema().expect("Could not retreive schema");
|
||||
for f in schema.iter_fields() {
|
||||
let mut unnamed_cols_counter = 0;
|
||||
let d = f.data_type().to_string();
|
||||
let n = if f.name.is_empty() {
|
||||
unnamed_cols_counter += 1;
|
||||
format!("Column{}", unnamed_cols_counter)
|
||||
} else {
|
||||
f.name
|
||||
};
|
||||
|
||||
println!("{n} ({d})");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn print_create(ldf: LazyFrame, table_name: &str, default_strlen: u32) {
|
||||
let schema = ldf.schema().expect("Could not retreive schema");
|
||||
// Create empty table
|
||||
let mut statements = vec![Table::create()
|
||||
.table(Alias::new(table_name))
|
||||
.if_not_exists()
|
||||
.to_string(PostgresQueryBuilder)];
|
||||
|
||||
// Alter table adding fields one by one
|
||||
let mut unnamed_cols_counter = 0;
|
||||
for f in schema.iter_fields() {
|
||||
let dtype = match f.data_type().to_string().as_str() {
|
||||
"i64" => ColumnType::Integer,
|
||||
"f64" => ColumnType::Float,
|
||||
"str" => ColumnType::String(Some(default_strlen)),
|
||||
"bool" => ColumnType::Boolean,
|
||||
&_ => todo!("Datatype {} not supported", f.data_type().to_string()),
|
||||
};
|
||||
|
||||
let name = if f.name.is_empty() {
|
||||
unnamed_cols_counter += 1;
|
||||
format!("Column{}", unnamed_cols_counter)
|
||||
} else {
|
||||
f.name
|
||||
};
|
||||
|
||||
let table = Table::alter()
|
||||
.table(Alias::new(table_name))
|
||||
.add_column(&mut ColumnDef::new_with_type(Alias::new(&name), dtype))
|
||||
.to_owned();
|
||||
statements.push(table.to_string(PostgresQueryBuilder));
|
||||
}
|
||||
|
||||
// Finallyls print all statements
|
||||
for statement in statements {
|
||||
println!("{};", statement);
|
||||
}
|
||||
}
|
@ -1,15 +1,10 @@
|
||||
use polars::prelude::LazyFrame;
|
||||
use polars::sql::SQLContext;
|
||||
use polars_sql::SQLContext;
|
||||
use polars_lazy::prelude::LazyFrame;
|
||||
|
||||
pub fn execute(ldf: LazyFrame, statement: &String) -> LazyFrame {
|
||||
let mut context = SQLContext::try_new().expect("Could not create context");
|
||||
context.register("this", ldf);
|
||||
|
||||
match context.execute(statement) {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
eprintln!("Query execution error {e}");
|
||||
LazyFrame::default()
|
||||
}
|
||||
}
|
||||
context
|
||||
.execute(statement)
|
||||
.expect("Could not execute statement")
|
||||
}
|
||||
|
Loading…
Reference in new issue