tradingbot/src/database_control.rs
2024-03-24 21:53:39 +09:00

767 lines
23 KiB
Rust

#![allow(unused)]
#![allow(warnings)]
use crate::DB_URL;
use sqlx::{mysql::*, Connection, Error, Executor, FromRow, Row};
use std::any::Any;
use tokio::time::{sleep, Duration};
// check specific table in database. If the table exists it returns true, or false.
pub async fn exists_table(table_name: &String) -> bool {
let mut query_base = String::from("SELECT 1 FROM information_schema.tables WHERE table_schema = 'tradingbot' AND table_name = '");
query_base.push_str(table_name.as_str());
query_base.push('\'');
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let exists_table = sqlx::query(&query_base).fetch_all(&mut conn).await.unwrap();
conn.close().await;
!exists_table.is_empty()
}
// check specific record in table. If the record exists it returns true, or false.
pub async fn exists_record(table_name: &String, condition: &Option<String>) -> bool {
#[derive(Debug, FromRow)]
struct Success {
success: i32,
}
let mut query_base = String::from("SELECT EXISTS (SELECT * FROM ");
query_base.push_str(table_name.as_str());
match condition {
Some(T) => {
query_base.push_str(" WHERE ");
query_base.push_str(T.as_str());
}
None => {}
}
query_base.push_str(" LIMIT 1) as success;");
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let mut exists_record = sqlx::query_as::<_, Success>(&query_base)
.fetch_one(&mut conn)
.await;
while exists_record.is_err() {
sleep(Duration::from_millis(200)).await;
exists_record = sqlx::query_as::<_, Success>(&query_base)
.fetch_one(&mut conn)
.await;
}
let result = exists_record.unwrap();
conn.close().await;
if result.success == 0 {
false
} else {
true
}
}
// make a new table. If the job succeeds it returns true, or false.
// columns_vec: Vec<(column name, type, option)>
pub async fn new_table(
table_name: &String,
columns_vec: &Vec<(&str, &str, Option<&str>)>,
table_condition: &Option<&str>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// query building.
let mut query = String::from("CREATE TABLE IF NOT EXISTS ");
query.push_str(table_name);
query.push('(');
for element in columns_vec.iter() {
query.push_str(element.0);
query.push(' ');
query.push_str(element.1);
query.push(' ');
match element.2 {
None => {}
_ => {
if (element.2.unwrap().contains("UNSIGNED")) {
query.push_str("UNSIGNED ");
}
if (element.2.unwrap().contains("UN")) {
query.push_str("UNSIGNED ");
}
if (element.2.unwrap().contains("PK")) {
query.push_str("PRIMARY KEY ");
}
if (element.2.unwrap().contains("NOTNULL")) {
query.push_str("NOT NULL");
}
if (element.2.unwrap().contains("AI")) {
query.push_str("AUTO_INCREMENT ");
}
}
}
query.push(',');
}
query.pop();
query.push_str(")");
match table_condition {
Some(T) => {
query.push(' ');
query.push_str(T);
}
None => {}
}
query.push(';');
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let mut query_result = sqlx::query(&query).execute(&mut conn).await;
while let Err(e) = query_result {
sleep(Duration::from_millis(200)).await;
query_result = sqlx::query(&query).execute(&mut conn).await;
}
conn.close().await?;
Ok(())
}
// drop a table. If the job succeeds it returns true, or false.
pub async fn drop_table(table_name: &String) -> Result<MySqlQueryResult, Error> {
let mut query = String::from("DROP TABLE `tradingbot`.`");
query.push_str(table_name);
query.push_str("`;");
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let query_result = sqlx::query(&query).execute(&mut conn).await;
conn.close().await?;
query_result
}
// copy data from source table to destination table
// both table structures must be same
pub async fn copy_table_data(
source_table: &String,
destination_table: &String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// query building.
let mut query = String::from("INSERT INTO ");
query.push_str(destination_table.as_str());
query.push_str(" (SELECT * FROM ");
query.push_str(source_table.as_str());
query.push_str(");");
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let mut query_result = sqlx::query(&query).execute(&mut conn).await;
while let Err(e) = query_result {
query_result = sqlx::query(&query).execute(&mut conn).await;
sleep(Duration::from_millis(200)).await;
}
conn.close().await?;
Ok(())
}
// insert a record into specific table
// e.g. column: ["name", "class", "age"],
// values: [["Kim", "blue", "7"]
pub async fn insert_one_record(
table_name: &String,
columns: &Vec<&str>,
values: &Vec<String>,
) -> Result<MySqlQueryResult, Error> {
// query building.
let mut query = String::from("INSERT INTO ");
query.push_str(table_name);
query.push('(');
for element in (*columns).clone() {
query.push_str(element);
query.push_str(",");
}
query.pop();
query.push_str(") ");
query.push_str("VALUES(");
for element in (*values).clone() {
query.push('\"');
if element == "true" {
query.push('1');
} else if element == "false" {
query.push('0');
} else {
query.push_str(element.as_str());
}
query.push('\"');
query.push_str(",");
}
query.pop();
query.push_str(");");
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let query_result = sqlx::query(&query).execute(&mut conn).await;
conn.close().await?;
query_result
}
pub async fn copy_record(
src_table: &String,
dest_table: &String,
columns: &str,
condition: &Option<String>,
) -> Result<MySqlQueryResult, Error> {
// query building.
let mut query = String::from("INSERT INTO ");
query.push_str(dest_table.as_str());
query.push_str(" (");
query.push_str(columns);
query.push_str(") SELECT ");
query.push_str(columns);
query.push_str(" FROM ");
query.push_str(src_table.as_str());
match condition {
None => {}
Some(condition) => {
query.push(' ');
query.push_str(condition.as_str());
query.push(';');
}
}
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let query_result = sqlx::query(&query).execute(&mut conn).await;
conn.close().await?;
query_result
}
// insert several records into specific table at once
// the length of columns and each inner vector in values should be same.
// e.g. column: ["name", "class", "age"],
// values: [["Kim", "blue", "7"], ["Lee", "red", "9"], ...]
pub async fn insert_records(
table_name: &String,
columns: &Vec<&str>,
values: &Vec<Vec<String>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// query building.
let mut query = String::from("INSERT INTO ");
query.push_str(table_name);
query.push('(');
for element in (*columns).clone() {
query.push_str(element);
query.push_str(",");
}
query.pop();
query.push_str(") ");
query.push_str("VALUES ");
for element in (*values).clone() {
query.push('(');
for element in element.into_iter() {
query.push('\'');
if element == "true" {
query.push('1');
} else if element == "false" {
query.push('0');
} else {
query.push_str(element.as_str());
}
query.push_str("\',");
}
query.pop();
query.push_str("),");
}
query.pop();
query.push_str(";");
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let mut query_result = sqlx::query(&query).execute(&mut conn).await;
while let Err(e) = query_result {
query_result = sqlx::query(&query).execute(&mut conn).await;
sleep(Duration::from_millis(200)).await;
}
conn.close().await?;
Ok(())
}
// update a record in table
// record: [column name, value], condition: (column name, value)
pub async fn update_record(
table_name: &String,
record: &Vec<(&str, &str)>,
condition: &Vec<(&str, &str)>,
) -> Result<MySqlQueryResult, Error> {
// query building.
let mut query = String::from("UPDATE ");
query.push_str(table_name);
query.push_str(" SET ");
for element in record.iter() {
query.push_str(element.0);
query.push('=');
if element.1 == "true" {
query.push('1');
} else if element.1 == "false" {
query.push('0');
} else {
query.push('\"');
query.push_str(element.1);
query.push('\"');
}
query.push_str(",");
}
query.pop();
query.push_str(" WHERE ");
for element in condition {
query.push_str(element.0);
query.push_str("=\"");
query.push_str(element.1);
query.push_str("\" AND ");
}
query.pop();
query.pop();
query.pop();
query.pop();
query.push(';');
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let query_result = sqlx::query(&query).execute(&mut conn).await;
conn.close().await?;
query_result
}
// update a record in table
// record: [column name, value], condition: (column name, value)
pub async fn update_record2(
table_name: &String,
record: &Vec<(String, String)>,
condition: &Vec<(String, String)>,
) -> Result<MySqlQueryResult, Error> {
// query building.
let mut query = String::from("UPDATE ");
query.push_str(table_name);
query.push_str(" SET ");
for element in record.iter() {
query.push_str(element.0.as_str());
query.push('=');
if element.1.as_str() == "true" {
query.push('1');
} else if element.1.as_str() == "false" {
query.push('0');
} else {
query.push('\"');
query.push_str(element.1.as_str());
query.push('\"');
}
query.push_str(",");
}
query.pop();
query.push_str(" WHERE ");
for element in condition {
query.push_str(element.0.as_str());
query.push_str("=\"");
query.push_str(element.1.as_str());
query.push_str("\" AND ");
}
query.pop();
query.pop();
query.pop();
query.pop();
query.push(';');
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let query_result = sqlx::query(&query).execute(&mut conn).await;
conn.close().await?;
query_result
}
// update a record in table
// record: (column name, value), condition: (column name, value)
pub async fn update_record3(
table_name: &String,
record: &Vec<(String, String)>,
condition: &Vec<(String, String)>,
) -> Result<MySqlQueryResult, Error> {
// query building.
let mut query = String::from("UPDATE ");
query.push_str(table_name);
query.push_str(" SET ");
for element in record.iter() {
query.push_str(element.0.as_str());
query.push('=');
if element.1.as_str() == "true" {
query.push('1');
} else if element.1.as_str() == "false" {
query.push('0');
} else {
query.push_str(element.1.as_str());
}
query.push_str(",");
}
query.pop();
query.push_str(" WHERE ");
for element in condition {
query.push_str(element.0.as_str());
query.push_str("=\"");
query.push_str(element.1.as_str());
query.push_str("\" AND ");
}
query.pop();
query.pop();
query.pop();
query.pop();
query.push(';');
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let query_result = sqlx::query(&query).execute(&mut conn).await;
conn.close().await?;
query_result
}
// update many records in a row
// record: [[id1, val1_1, val2_1], [id2, val1_2, val2_1], ...] (the first element in each record must be id)
// columns: ["col_name1", "col_name2"] (columns except id)
pub async fn update_records(
table_name: &String,
records: &Vec<Vec<String>>,
columns: &Vec<&str>,
) -> Result<MySqlQueryResult, Error> {
// query building.
let mut query = String::from("UPDATE ");
query.push_str(table_name);
query.push_str(" s JOIN(");
let mut first_flag: bool = true;
let col_len = columns.len();
let mut record_count: usize = 0;
let mut col_count: usize = 0;
let mut new_scores_vec: Vec<String> = Vec::new();
let mut new_score_name = String::from("new_score");
for _ in 0..col_len {
new_scores_vec.push(new_score_name.clone());
new_scores_vec[record_count].push_str(record_count.to_string().as_str());
record_count += 1;
}
record_count = 0;
for record in records {
query.push_str("SELECT ");
if first_flag == true {
for element in record {
if record_count == 0 {
query.push_str(&element);
query.push_str(" as id, ");
record_count += 1;
} else {
query.push_str(&element);
query.push_str(" as ");
query.push_str(new_scores_vec[col_count].clone().as_str());
query.push_str(", ");
record_count += 1;
col_count += 1;
}
}
query.pop();
query.pop();
query.push(' ');
first_flag = false;
} else {
for _ in 0..7 {
query.pop();
}
query.push_str("UNION ALL SELECT ");
for element in record {
if element.as_str() == "true" {
query.push('1');
} else if element.as_str() == "false" {
query.push('0');
} else {
query.push_str(element.as_str());
}
query.push_str(",");
}
query.pop();
query.push(' ');
}
}
query.push_str(") vals ON s.id = vals.id SET ");
record_count = 0;
for element in new_scores_vec {
query.push_str(columns[record_count]);
query.push_str(" = ");
query.push_str(&element);
query.push_str(", ");
record_count += 1;
}
query.pop();
query.pop();
query.push(';');
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let query_result = sqlx::query(&query).execute(&mut conn).await;
conn.close().await?;
query_result
}
// delete a record in table
// condition: (column name, value)
pub async fn delete_record(
table_name: &String,
condition: &String,
) -> Result<MySqlQueryResult, Error> {
// query building.
let mut query = String::from("DELETE FROM ");
query.push_str(table_name);
query.push(' ');
query.push_str(condition.as_str());
query.push(';');
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let query_result = sqlx::query(&query).execute(&mut conn).await;
conn.close().await?;
query_result
}
// count total row in table
pub async fn count_rows(table_name: &String) -> Result<i32, Error> {
#[derive(FromRow)]
struct Count {
cnt: i32,
}
// query building.
let mut query = String::from("SELECT COUNT(*) as cnt FROM ");
query.push_str(table_name.as_str());
query.push(';');
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let query_result = sqlx::query_as::<_, Count>(&query)
.fetch_one(&mut conn)
.await;
let result = match query_result {
Ok(T) => Ok(T.cnt),
Err(E) => return Err(E),
};
conn.close().await?;
result
}
// delete all rows in table
pub async fn delete_all_rows(
table_name: &String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut query = String::from("TRUNCATE TABLE ");
query.push_str(table_name);
query.push_str(";");
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let mut query_result = sqlx::query(&query).execute(&mut conn).await;
while let Err(e) = query_result {
query_result = sqlx::query(&query).execute(&mut conn).await;
sleep(Duration::from_millis(200)).await;
}
conn.close().await?;
Ok(())
}
// select record from table
pub async fn select_record<T>(
table_name: &String,
column: &String,
condition: &Option<String>,
data_struct: &T,
) -> Result<Vec<T>, Box<dyn std::error::Error + Send + Sync>>
where
T: for<'r> FromRow<'r, MySqlRow> + Send + Unpin,
{
let mut query = String::from("SELECT ");
query.push_str(column.to_lowercase().as_str());
query.push_str(" FROM ");
query.push_str(table_name.to_lowercase().as_str());
match condition {
None => {}
Some(condition) => {
query.push(' ');
query.push_str(condition.as_str());
query.push(';');
}
}
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
// let mut query_result: Vec<T> = sqlx::query_as::<_, T>(&query).fetch_all(&mut conn).await?;
let mut query_result = sqlx::query_as::<_, T>(&query).fetch_all(&mut conn).await;
let mut query_result_vec: Vec<T> = Vec::new();
loop {
match query_result {
Ok(T) => {
if !T.is_empty() {
query_result_vec = T;
break;
} else {
sleep(Duration::from_millis(200)).await;
query_result = sqlx::query_as::<_, T>(&query).fetch_all(&mut conn).await;
}
}
Err(e) => {
sleep(Duration::from_millis(200)).await;
query_result = sqlx::query_as::<_, T>(&query).fetch_all(&mut conn).await;
}
}
}
conn.close().await?;
Ok(query_result_vec)
}
// select record from table. No loop
pub async fn try_select_record<T>(
table_name: &String,
column: &String,
condition: &Option<String>,
data_struct: &T,
) -> Result<Vec<T>, Box<dyn std::error::Error + Send + Sync>>
where
T: for<'r> FromRow<'r, MySqlRow> + Send + Unpin,
{
let mut query = String::from("SELECT ");
query.push_str(column.to_lowercase().as_str());
query.push_str(" FROM ");
query.push_str(table_name.to_lowercase().as_str());
match condition {
None => {}
Some(condition) => {
query.push(' ');
query.push_str(condition.as_str());
query.push(';');
}
}
let mut conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
//retry connection until it will be done.
while conn_result.is_err() {
sleep(Duration::from_millis(200)).await;
conn_result = sqlx::mysql::MySqlConnection::connect(DB_URL).await;
}
let mut conn = conn_result.unwrap();
let mut query_result: Vec<T> = sqlx::query_as::<_, T>(&query).fetch_all(&mut conn).await?;
conn.close().await?;
Ok(query_result)
}