Compare commits
2 commits
4c26c4c344
...
e29b3d18e8
Author | SHA1 | Date | |
---|---|---|---|
e29b3d18e8 | |||
717da2e1b6 |
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "dr"
|
name = "dr"
|
||||||
description = "Command-line data file processing in Rust"
|
description = "Command-line data file processing in Rust"
|
||||||
version = "0.5.4"
|
version = "0.6.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
include = [
|
include = [
|
||||||
"**/*.rs",
|
"**/*.rs",
|
||||||
|
@ -18,3 +18,4 @@ polars-lazy = {"version" = "0.26", "features" = ["parquet", "ipc", "csv-file"]}
|
||||||
polars-core = {"version" = "0.26", "features" = ["describe", "fmt"]}
|
polars-core = {"version" = "0.26", "features" = ["describe", "fmt"]}
|
||||||
polars-io = {"version" = "0.26", "features" = ["ipc_streaming"]}
|
polars-io = {"version" = "0.26", "features" = ["ipc_streaming"]}
|
||||||
polars-sql = {"version" = "0.2.2"}
|
polars-sql = {"version" = "0.2.2"}
|
||||||
|
sea-query = {"version" = "0.28"}
|
||||||
|
|
31
src/main.rs
31
src/main.rs
|
@ -1,4 +1,5 @@
|
||||||
mod io;
|
mod io;
|
||||||
|
mod schema;
|
||||||
mod sql;
|
mod sql;
|
||||||
use clap::{arg, command, ArgAction, Command};
|
use clap::{arg, command, ArgAction, Command};
|
||||||
use polars_lazy::prelude::*;
|
use polars_lazy::prelude::*;
|
||||||
|
@ -28,6 +29,22 @@ fn main() {
|
||||||
.action(ArgAction::SetTrue),
|
.action(ArgAction::SetTrue),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
.subcommand(
|
||||||
|
Command::new("schema")
|
||||||
|
.about("Several table schema related utilities")
|
||||||
|
.arg(arg!(-n --name <String> "Table name").required(false))
|
||||||
|
.arg(arg!(-l --strlen <String> "Default length for string columns").required(false))
|
||||||
|
.arg(
|
||||||
|
arg!(-s --summary ... "Summarize the schema")
|
||||||
|
.required(false)
|
||||||
|
.action(ArgAction::SetTrue),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
arg!(-p --postgresql ... "Create a postgresql table with schema")
|
||||||
|
.required(false)
|
||||||
|
.action(ArgAction::SetTrue),
|
||||||
|
),
|
||||||
|
)
|
||||||
.subcommand(
|
.subcommand(
|
||||||
Command::new("sql")
|
Command::new("sql")
|
||||||
.about("Runs a sql statement on the file")
|
.about("Runs a sql statement on the file")
|
||||||
|
@ -87,7 +104,6 @@ fn main() {
|
||||||
.arg(arg!([path] "Path to the new parquet file")),
|
.arg(arg!([path] "Path to the new parquet file")),
|
||||||
)
|
)
|
||||||
.get_matches();
|
.get_matches();
|
||||||
|
|
||||||
if let Some(_matches) = matches.subcommand_matches("csv") {
|
if let Some(_matches) = matches.subcommand_matches("csv") {
|
||||||
if let Some(path) = _matches.get_one::<String>("path") {
|
if let Some(path) = _matches.get_one::<String>("path") {
|
||||||
let mut ldf = io::read_csv(path.to_string());
|
let mut ldf = io::read_csv(path.to_string());
|
||||||
|
@ -171,6 +187,19 @@ fn main() {
|
||||||
} else {
|
} else {
|
||||||
eprintln!("Could now write to parquet");
|
eprintln!("Could now write to parquet");
|
||||||
}
|
}
|
||||||
|
} else if let Some(_matches) = matches.subcommand_matches("schema") {
|
||||||
|
if _matches.get_flag("summary") {
|
||||||
|
schema::print_schema(io::read_ipc());
|
||||||
|
} else if _matches.get_flag("postgresql") {
|
||||||
|
let name = _matches
|
||||||
|
.get_one::<String>("name")
|
||||||
|
.expect("Please provide a table name");
|
||||||
|
let strlen: u32 = match _matches.get_one::<String>("strlen") {
|
||||||
|
Some(strlen) => strlen.parse::<u32>().unwrap(),
|
||||||
|
None => 128,
|
||||||
|
};
|
||||||
|
schema::print_create(io::read_ipc(), name.as_str(), strlen);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
println!("No command provided. Please execute dr --help")
|
println!("No command provided. Please execute dr --help")
|
||||||
}
|
}
|
||||||
|
|
42
src/schema.rs
Normal file
42
src/schema.rs
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
use polars_lazy::prelude::*;
|
||||||
|
use sea_query::table::ColumnType;
|
||||||
|
use sea_query::*;
|
||||||
|
|
||||||
|
pub fn print_schema(ldf: LazyFrame) {
|
||||||
|
let schema = ldf.schema().expect("Could not retreive schema");
|
||||||
|
for f in schema.iter_fields() {
|
||||||
|
let n = f.name();
|
||||||
|
let d = f.data_type().to_string();
|
||||||
|
println!("{n} ({d})");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn print_create(ldf: LazyFrame, table_name: &str, default_strlen: u32) {
|
||||||
|
let schema = ldf.schema().expect("Could not retreive schema");
|
||||||
|
// Create empty table
|
||||||
|
let mut statements = vec![Table::create()
|
||||||
|
.table(Alias::new(table_name))
|
||||||
|
.if_not_exists()
|
||||||
|
.to_string(PostgresQueryBuilder)];
|
||||||
|
|
||||||
|
// Alter table adding fields one by one
|
||||||
|
for f in schema.iter_fields() {
|
||||||
|
let dtype = match f.data_type().to_string().as_str() {
|
||||||
|
"i64" => ColumnType::Integer,
|
||||||
|
"f64" => ColumnType::Float,
|
||||||
|
"str" => ColumnType::String(Some(default_strlen)),
|
||||||
|
"bool" => ColumnType::Boolean,
|
||||||
|
&_ => todo!("Datatype {} not supported", f.data_type().to_string()),
|
||||||
|
};
|
||||||
|
let table = Table::alter()
|
||||||
|
.table(Alias::new(table_name))
|
||||||
|
.add_column(&mut ColumnDef::new_with_type(Alias::new(&f.name), dtype))
|
||||||
|
.to_owned();
|
||||||
|
statements.push(table.to_string(PostgresQueryBuilder));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finall print all statements
|
||||||
|
for statement in statements {
|
||||||
|
println!("{};", statement);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue