#![allow(unused)] #![allow(warnings)] use crate::coin_health_check_team::request_candles::CandleData; use crate::coin_health_check_team::request_others::CoinPriceData; use crate::database_control::*; use csv::{DeserializeRecordsIter, StringRecord}; use rust_decimal::{prelude::ToPrimitive, Decimal}; use serde::Deserialize; use sqlx::FromRow; use std::sync::Arc; use tokio::{fs::*, sync::Mutex, time::*}; #[derive(Debug, Clone, PartialEq)] pub struct RealtimePriceData { pub opclo_price: f64, pub open_price: f64, pub close_price: f64, pub high_price: f64, pub low_price: f64, pub close_time: i64, pub quote_asset_volume: f64, pub candle_type: String, } impl RealtimePriceData { fn new() -> RealtimePriceData { let data = RealtimePriceData { opclo_price: 0.0, open_price: 0.0, close_price: 0.0, high_price: 0.0, low_price: 0.0, close_time: 0, quote_asset_volume: 0.0, candle_type: String::new(), }; data } } pub async fn update_realtime_price_data( interval: &String, read_candle_for_opclo: &Vec<(String, Vec)>, read_candle_for_rt: &Vec<(String, Vec)>, write_rt_data: &mut Vec<(String, Vec)>, read_price: &Vec, read_symbol: &Vec, ) -> Result<(), Box> { let instant = Instant::now(); let mut rt_price_vec: Vec = Vec::new(); let mut rt_data_vec: Vec<(String, Vec)> = Vec::new(); for element in read_symbol { let candle_search_result = read_candle_for_opclo.iter().position(|x| x.0 == *element); match candle_search_result { Some(T) => { for element in &read_candle_for_opclo[T].1 { let mut realtime_price_data_builder = RealtimePriceData::new(); realtime_price_data_builder.opclo_price = (element.open_price + element.close_price) / 2.0; realtime_price_data_builder.open_price = element.open_price; realtime_price_data_builder.close_price = element.close_price; realtime_price_data_builder.high_price = element.high_price; realtime_price_data_builder.low_price = element.low_price; realtime_price_data_builder.close_time = element.close_time; realtime_price_data_builder.quote_asset_volume = element.quote_asset_volume; if element.open_price < element.close_price { realtime_price_data_builder.candle_type = String::from("UP"); } else { realtime_price_data_builder.candle_type = String::from("DOWN"); } rt_price_vec.push(realtime_price_data_builder); } // reflect realtime data to the last element in rt_price_vec if interval.contains("1m") { let price_search_result = read_price.iter().position(|x| x.symbol == *element); if price_search_result.is_some() { // update close_price rt_price_vec.last_mut().unwrap().close_price = read_price[price_search_result.unwrap()].current_price; // update opclo_price rt_price_vec.last_mut().unwrap().opclo_price = (rt_price_vec.last_mut().unwrap().open_price + rt_price_vec.last_mut().unwrap().close_price) / 2.0; // update candle_type if rt_price_vec.last_mut().unwrap().close_price >= rt_price_vec.last_mut().unwrap().open_price { rt_price_vec.last_mut().unwrap().candle_type = String::from("UP"); } else { rt_price_vec.last_mut().unwrap().candle_type = String::from("DOWN"); } // update high_price if rt_price_vec.last_mut().unwrap().high_price < read_price[price_search_result.unwrap()].current_price { rt_price_vec.last_mut().unwrap().high_price = read_price[price_search_result.unwrap()].current_price; } // update low_price if rt_price_vec.last_mut().unwrap().low_price > read_price[price_search_result.unwrap()].current_price { rt_price_vec.last_mut().unwrap().low_price = read_price[price_search_result.unwrap()].current_price; } } } else { // for 30m, uses 1m candle // for 1d, uses 30m candle // for 1w, uses 1d candle // for 1mon, uses 1w candle // search symbol let candle_search_result = read_candle_for_rt.iter().position(|x| x.0 == *element); match candle_search_result { Some(T) => { if read_candle_for_rt[T].1.len() >= 2 { let mut candle_vec_clone = read_candle_for_rt[T].1.clone(); let mut rt_price_vec_clone = rt_price_vec.clone(); rt_price_vec_clone.reverse(); rt_price_vec_clone.truncate(2); rt_price_vec_clone.reverse(); // update realtime information for the latest candle let mut update_closeprice = 0.0; let mut update_highprice = 0.0; let mut update_lowprice = 0.0; let mut update_quote_asset_volume = 0.0; // search close time let prev_closetime_result = candle_vec_clone.binary_search_by_key( &rt_price_vec_clone.first().unwrap().close_time, |RealtimePriceData { opclo_price, open_price, close_price, high_price, low_price, close_time, quote_asset_volume, candle_type, }| *close_time, ); if prev_closetime_result.is_ok() { let result = candle_vec_clone.get(prev_closetime_result.unwrap() + 1..); if result.is_some() { let update_highprice_result = result.unwrap().iter().max_by(|x, y| { x.high_price.partial_cmp(&y.high_price).unwrap() }); if update_highprice_result.is_some() { update_highprice = update_highprice_result.unwrap().high_price; } let update_lowprice_result = result.unwrap().iter().min_by(|x, y| { x.low_price.partial_cmp(&y.low_price).unwrap() }); if update_lowprice_result.is_some() { update_lowprice = update_lowprice_result.unwrap().low_price; } for element in result.unwrap() { update_quote_asset_volume += element.quote_asset_volume; } } } let price_search_result = read_price.iter().position(|x| x.symbol == *element); if price_search_result.is_some() { update_closeprice = read_price[price_search_result.unwrap()].current_price; } // update the latest candle with values if update_highprice != 0.0 && !update_highprice.is_nan() && update_highprice.is_finite() { rt_price_vec.last_mut().unwrap().high_price = update_highprice; } if update_lowprice != 0.0 && !update_lowprice.is_nan() && update_lowprice.is_finite() { rt_price_vec.last_mut().unwrap().low_price = update_lowprice; } if update_quote_asset_volume != 0.0 && !update_quote_asset_volume.is_nan() && update_quote_asset_volume.is_finite() { rt_price_vec.last_mut().unwrap().quote_asset_volume = update_quote_asset_volume; } if update_closeprice != 0.0 && !update_closeprice.is_nan() && update_closeprice.is_finite() { rt_price_vec.last_mut().unwrap().close_price = update_closeprice; rt_price_vec.last_mut().unwrap().opclo_price = (update_closeprice + rt_price_vec.last_mut().unwrap().open_price) / 2.0; } } } None => {} } } rt_data_vec.push((element.clone(), rt_price_vec.clone())); rt_price_vec.clear(); } None => {} } } *write_rt_data = rt_data_vec; // println!(" datapoints/price_{} 완료 elapsed:{:.2}s", interval, instant.elapsed().as_secs_f32()); Ok(()) } // [price_data] // pub async fn price_data(candle_period: &str) -> Result<(), Box> { // let instant = Instant::now(); // // select the whole symbol // let mut usdt_trades = UsdtTrades { symbol: String::new() }; // let valid_usdt_table_name = String::from("valid_usdt_trades"); // let valid_usdt_column_name = String::from("symbol"); // let table_condition = None; // let mut select_result = select_record(&valid_usdt_table_name, &valid_usdt_column_name, &table_condition, &usdt_trades).await?; // let mut candle_table_name = String::from("candle_"); // let mut table_name_build = String::new(); // let column_name = String::from("openprice, highprice, lowprice, closeprice, closetime, quoteassetvolume"); // let table_condition = None; // let mut candle_data_struct = CandleData { openprice: 0.0, highprice: 0.0, lowprice: 0.0, closeprice: 0.0, closetime: 0, quoteassetvolume: 0.0 }; // let mut output_path = String::from("datapoints/price/price_"); // output_path.push_str(candle_period); // output_path.push('_'); // let mut output_path_build = String::new(); // let mut content_build = String::new(); // match candle_period { // "1m" => { // for usdttrade in select_result { // content_build.clear(); // table_name_build.clear(); // table_name_build.push_str(candle_table_name.as_str()); // table_name_build.push_str(usdttrade.symbol.to_lowercase().as_str()); // table_name_build.push('_'); // table_name_build.push_str(candle_period); // output_path_build.clear(); // output_path_build.push_str(output_path.as_str()); // output_path_build.push_str(usdttrade.symbol.as_str()); // output_path_build.push_str(".csv"); // let mut query_result = select_record(&table_name_build, &column_name, &table_condition, &candle_data_struct).await?; // let mut file = tokio::fs::OpenOptions::new().create(true).write(true).truncate(true).open(output_path_build.as_str()).await; // while let Err(e) = file { // file = tokio::fs::OpenOptions::new().create(true).write(true).truncate(true).open(output_path_build.as_str()).await; // sleep(Duration::from_millis(1000)); // } // if file.is_ok() { // for element2 in query_result { // let meanprice = (element2.openprice + element2.closeprice)/2.0; // content_build.push_str(meanprice.to_string().as_str()); // opclo price // content_build.push(','); // content_build.push_str(element2.openprice.to_string().as_str()); // openprice // content_build.push(','); // content_build.push_str(element2.closeprice.to_string().as_str()); // closeprice // content_build.push(','); // content_build.push_str(element2.highprice.to_string().as_str()); // highprice // content_build.push(','); // content_build.push_str(element2.lowprice.to_string().as_str()); // lowprice // content_build.push(','); // content_build.push_str(element2.closetime.to_string().as_str()); // closetime // content_build.push(','); // content_build.push_str(element2.quoteassetvolume.to_string().as_str()); // quote asset volume (USDT) // content_build.push(','); // if element2.closeprice >= element2.openprice { // content_build.push_str("UP"); // UP candle // } else { // content_build.push_str("DOWN"); // DOWN candle // } // content_build.push('\n'); // } // content_build.pop(); // file?.write_all(content_build.as_bytes()).await; // } else if file.is_err() { // println!(">>> File error occurred {:?} (datapoints/price_{})", file.unwrap_err(), candle_period); // } // } // }, // _ => { // // read price // let mut read_fixed_path = String::from("datapoints/price/price_"); // match candle_period { // "30m" => {read_fixed_path.push_str("1m");}, // "1d" => {read_fixed_path.push_str("30m");}, // "1w" => {read_fixed_path.push_str("1d");}, // "1mon" => {read_fixed_path.push_str("1d");}, // _ => { println!(">>> Wrong candle_period. Check the parameter available (datapoints/price_{})", candle_period); }, // } // read_fixed_path.push('_'); // let mut read_path_build = String::new(); // let mut read_data_vec: Vec = Vec::new(); // // select current coin prices // let coinprice_table_name = String::from("coinprices"); // let coinprice_column_name = String::from("symbol, price"); // let mut condition = None; // let mut select_data_structure = CoinPriceData { symbol: String::new(), price: Decimal::new(0,8) }; // let mut coinprice_select_result = select_record(&coinprice_table_name, &coinprice_column_name, &condition, &select_data_structure).await.unwrap(); // for usdttrade in select_result { // read_path_build.clear(); // read_path_build.push_str(read_fixed_path.as_str()); // read_path_build.push_str(usdttrade.symbol.as_str()); // read_path_build.push_str(".csv"); // let mut price_record = StringRecord::new(); // let mut rdr = csv::ReaderBuilder::new() // .has_headers(false) // .from_path(&read_path_build)?; // while let false = rdr.read_record(&mut price_record)? { // rdr = csv::ReaderBuilder::new() // .has_headers(false) // .from_path(&read_path_build)?; // sleep(Duration::from_millis(1000)); // } // read_data_vec.clear(); // if price_record[5].parse::()? != 0 { // 유효한 데이터가 파일에 들어있는 경우 // read_data_vec.push(PriceData { // opclo_price: price_record[0].parse::()?, // open_price: price_record[1].parse::()?, // close_price: price_record[2].parse::()?, // high_price: price_record[3].parse::()?, // low_price: price_record[4].parse::()?, // close_time: price_record[5].parse::()?, // quote_asset_volume: price_record[6].parse::()?, // candle_type: price_record[7].parse::()?, // }); // for element in rdr.deserialize() { // read_data_vec.push(element?); // } // content_build.clear(); // table_name_build.clear(); // table_name_build.push_str(candle_table_name.as_str()); // table_name_build.push_str(usdttrade.symbol.to_lowercase().as_str()); // table_name_build.push('_'); // table_name_build.push_str(candle_period); // output_path_build.clear(); // output_path_build.push_str(output_path.as_str()); // output_path_build.push_str(usdttrade.symbol.as_str()); // output_path_build.push_str(".csv"); // let mut query_result = select_record(&table_name_build, &column_name, &table_condition, &candle_data_struct).await?; // if query_result.len() >= 2 { // // update realtime information for the latest candle // let mut update_closeprice = 0.0; // let mut update_highprice = 0.0; // let mut update_lowprice = 0.0; // let mut update_quote_asset_volume = 0.0; // let mut query_result_copy = query_result.clone(); // query_result_copy.reverse(); // query_result_copy.truncate(2); // query_result_copy.reverse(); // let prev_closetime_result = read_data_vec.binary_search_by_key(&query_result_copy.first().unwrap().closetime, |PriceData {opclo_price, open_price, close_price, high_price, low_price, close_time, quote_asset_volume, candle_type}|*close_time as i64); // if prev_closetime_result.is_ok() { // let result = read_data_vec.get(prev_closetime_result.unwrap()+1..); // if result.is_some() { // let update_highprice_result = result.unwrap().iter().max_by(|x, y| x.high_price.partial_cmp(&y.high_price).unwrap()); // if update_highprice_result.is_some() { // update_highprice = update_highprice_result.unwrap().high_price; // } // let update_lowprice_result = result.unwrap().iter().min_by(|x, y| x.low_price.partial_cmp(&y.low_price).unwrap()); // if update_lowprice_result.is_some() { // update_lowprice = update_lowprice_result.unwrap().low_price; // } // for element in result.unwrap() { // update_quote_asset_volume += element.quote_asset_volume; // } // } // } // let coinprice_result = coinprice_select_result.iter().find(|x| x.symbol==usdttrade.symbol); // if coinprice_result.is_some() { // update_closeprice = coinprice_result.unwrap().price.to_f64().unwrap(); // } // // update the latest candle with values // if let Some(last) = query_result.last_mut() { // if update_highprice != 0.0 && !update_highprice.is_nan() && update_highprice.is_finite() { // last.highprice = update_highprice; // } // if update_lowprice != 0.0 && !update_lowprice.is_nan() && update_lowprice.is_finite() { // last.lowprice = update_lowprice; // } // if update_quote_asset_volume != 0.0 && !update_quote_asset_volume.is_nan() && update_quote_asset_volume.is_finite() { // last.quoteassetvolume = update_quote_asset_volume; // } // if update_closeprice != 0.0 && !update_closeprice.is_nan() && update_closeprice.is_finite() { // last.closeprice = update_closeprice; // } // } // let mut file = tokio::fs::OpenOptions::new().create(true).write(true).truncate(true).open(output_path_build.as_str()).await; // while let Err(e) = file { // file = tokio::fs::OpenOptions::new().create(true).write(true).truncate(true).open(output_path_build.as_str()).await; // sleep(Duration::from_millis(1000)); // } // if file.is_ok() { // for element2 in query_result { // let meanprice = (element2.openprice + element2.closeprice)/2.0; // content_build.push_str(meanprice.to_string().as_str()); // opclo price // content_build.push(','); // content_build.push_str(element2.openprice.to_string().as_str()); // openprice // content_build.push(','); // content_build.push_str(element2.closeprice.to_string().as_str()); // closeprice // content_build.push(','); // content_build.push_str(element2.highprice.to_string().as_str()); // highprice // content_build.push(','); // content_build.push_str(element2.lowprice.to_string().as_str()); // lowprice // content_build.push(','); // content_build.push_str(element2.closetime.to_string().as_str()); // closetime // content_build.push(','); // content_build.push_str(element2.quoteassetvolume.to_string().as_str()); // quote asset volume (USDT) // content_build.push(','); // if element2.closeprice >= element2.openprice { // content_build.push_str("UP"); // UP candle // } else { // content_build.push_str("DOWN"); // DOWN candle // } // content_build.push('\n'); // } // content_build.pop(); // file?.write_all(content_build.as_bytes()).await; // } else if file.is_err() { // println!(">>> File error occurred {:?} (datapoints/price_{})", file.unwrap_err(), candle_period); // } // } // } // } // } // } // println!(" datapoints/price_{} 완료 elapsed:{:.2}s", candle_period, instant.elapsed().as_secs_f32()); // Ok(()) // }