First attempt at db insertion support

This commit is contained in:
Guillem Borrell 2023-01-17 11:28:49 +00:00
parent fc063601c5
commit 717da2e1b6
3 changed files with 74 additions and 2 deletions

View file

@ -1,7 +1,7 @@
[package]
name = "dr"
description = "Command-line data file processing in Rust"
version = "0.5.3"
version = "0.6.0"
edition = "2021"
include = [
"**/*.rs",
@ -18,3 +18,4 @@ polars-lazy = {"version" = "0.26", "features" = ["parquet", "ipc", "csv-file"]}
polars-core = {"version" = "0.26", "features" = ["describe", "fmt"]}
polars-io = {"version" = "0.26", "features" = ["ipc_streaming"]}
polars-sql = {"version" = "0.2.2"}
sea-query = {"version" = "0.28"}

View file

@ -1,4 +1,5 @@
mod io;
mod schema;
mod sql;
use clap::{arg, command, ArgAction, Command};
use polars_lazy::prelude::*;
@ -28,6 +29,22 @@ fn main() {
.action(ArgAction::SetTrue),
),
)
.subcommand(
Command::new("schema")
.about("Several table schema related utilities")
.arg(arg!(-n --name <String> "Table name").required(false))
.arg(arg!(-l --strlen <String> "Default length for string columns").required(false))
.arg(
arg!(-s --summary ... "Summarize the schema")
.required(false)
.action(ArgAction::SetTrue),
)
.arg(
arg!(-p --postgresql ... "Create a postgresql table with schema")
.required(false)
.action(ArgAction::SetTrue),
),
)
.subcommand(
Command::new("sql")
.about("Runs a sql statement on the file")
@ -87,7 +104,6 @@ fn main() {
.arg(arg!([path] "Path to the new parquet file")),
)
.get_matches();
if let Some(_matches) = matches.subcommand_matches("csv") {
if let Some(path) = _matches.get_one::<String>("path") {
let mut ldf = io::read_csv(path.to_string());
@ -171,6 +187,19 @@ fn main() {
} else {
eprintln!("Could now write to parquet");
}
} else if let Some(_matches) = matches.subcommand_matches("schema") {
if _matches.get_flag("summary") {
schema::print_schema(io::read_ipc());
} else if _matches.get_flag("postgresql") {
let name = _matches
.get_one::<String>("name")
.expect("Please provide a table name");
let strlen: u32 = match _matches.get_one::<String>("strlen") {
Some(strlen) => strlen.parse::<u32>().unwrap(),
None => 128,
};
schema::print_create(io::read_ipc(), name.as_str(), strlen);
}
} else {
println!("No command provided. Please execute dr --help")
}

42
src/schema.rs Normal file
View file

@ -0,0 +1,42 @@
use polars_lazy::prelude::*;
use sea_query::table::ColumnType;
use sea_query::*;
pub fn print_schema(ldf: LazyFrame) {
let schema = ldf.schema().expect("Could not retreive schema");
for f in schema.iter_fields() {
let n = f.name();
let d = f.data_type().to_string();
println!("{n} ({d})");
}
}
pub fn print_create(ldf: LazyFrame, table_name: &str, default_strlen: u32) {
let schema = ldf.schema().expect("Could not retreive schema");
// Create empty table
let mut statements = vec![Table::create()
.table(Alias::new(table_name))
.if_not_exists()
.to_string(PostgresQueryBuilder)];
// Alter table adding fields one by one
for f in schema.iter_fields() {
let dtype = match f.data_type().to_string().as_str() {
"i64" => ColumnType::Integer,
"f64" => ColumnType::Float,
"str" => ColumnType::String(Some(default_strlen)),
"bool" => ColumnType::Boolean,
&_ => todo!("Datatype {} not supported", f.data_type().to_string()),
};
let table = Table::alter()
.table(Alias::new(table_name))
.add_column(&mut ColumnDef::new_with_type(Alias::new(&f.name), dtype))
.to_owned();
statements.push(table.to_string(PostgresQueryBuilder));
}
// Finall print all statements
for statement in statements {
println!("{};", statement);
}
}