diff --git a/src/coex/strategy_team.rs b/src/coex/strategy_team.rs index 4cce8b6..3c075ed 100644 --- a/src/coex/strategy_team.rs +++ b/src/coex/strategy_team.rs @@ -14,13 +14,15 @@ use crate::value_estimation_team::indicators::rsi::RsiData; use crate::value_estimation_team::indicators::sma::SmaData; use crate::value_estimation_team::indicators::stoch_rsi::{StochRsiDData, StochRsiKData}; use crate::value_estimation_team::indicators::supertrend::{supertrend, SupertrendData}; +use futures::future::try_join_all; use csv::{DeserializeRecordsIter, StringRecord}; use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; use serde::Deserialize; use sqlx::FromRow; -use std::cmp::Ordering; +use std::{cmp::Ordering, sync::Arc}; use tokio::time::{sleep, Duration, Instant}; +use tokio::sync::Mutex; use crate::signal_association::signal_decision::*; @@ -31,6 +33,7 @@ pub enum MA { Tema, } +#[derive(Debug)] pub struct AllData { pub price_vec: Vec, pub valid_symbol_vec: Vec, @@ -927,123 +930,161 @@ pub async fn execute_strategists( pub async fn execute_strategist_for_test_temp( alldata: &AllData, ) -> Result<(), Box> { + + // 1st filtering: supertrend(ATR period 10, multiplier: 1.3, 30m close price), the area should be in SELL area. - let mut filtered_2nd_symbols: Vec<(String, i64)> = Vec::new(); // (symbol, closetime) - let mut opclo_30m_vec: Vec = Vec::new(); - let mut supertrend_vec: Vec = Vec::new(); - for symbol in &alldata.valid_symbol_vec { - let opclo_30m_option = alldata - .rt_price_30m_vec - .iter() - .position(|x| *x.0 == *symbol); - let supertrend_option_30m = - supertrend(&symbol, &alldata.rt_price_30m_vec, 10, 1.3, true).await; + let mut filtered_2nd_symbols: Vec<(String, i64)> = Vec::new(); + let mut filtered_2nd_symbols_arc: Arc>> = Arc::new(Mutex::new(filtered_2nd_symbols)); // (symbol, closetime) + let mut task_vec = Vec::new(); + let valid_symbol_vec_c = alldata.valid_symbol_vec.clone(); + for symbol in valid_symbol_vec_c { + let mut opclo_30m_vec: Vec = Vec::new(); + let mut supertrend_vec: Vec = Vec::new(); + let rt_price_30m_vec_c = alldata.rt_price_30m_vec.clone(); + let filtered_2nd_symbols_arc_c = Arc::clone(&filtered_2nd_symbols_arc); + task_vec.push(tokio::spawn(async move { + let opclo_30m_option = rt_price_30m_vec_c.iter() + .position(|x| *x.0 == symbol); + let supertrend_option_30m = + supertrend(&symbol, &rt_price_30m_vec_c, 10, 1.3, true).await; - if opclo_30m_option.is_some() && supertrend_option_30m.is_some() { - opclo_30m_vec = alldata.rt_price_30m_vec[opclo_30m_option.unwrap()] - .1 - .clone(); - supertrend_vec = supertrend_option_30m.unwrap(); - - if opclo_30m_vec.len() >= 3 && supertrend_vec.len() >= 3 { - let supertrend_search_result = supertrend_vec.binary_search_by_key( - &opclo_30m_vec.last().unwrap().close_time, - |SupertrendData { - band_value, - signal, - area, - close_time, - }| *close_time, - ); - if supertrend_search_result.is_ok() { - if supertrend_vec[supertrend_search_result.unwrap()] - .area - .contains("DOWN") - { - filtered_2nd_symbols - .push((symbol.clone(), opclo_30m_vec.last().unwrap().close_time)); + if opclo_30m_option.is_some() && supertrend_option_30m.is_some() { + opclo_30m_vec = rt_price_30m_vec_c[opclo_30m_option.unwrap()] + .1 + .clone(); + supertrend_vec = supertrend_option_30m.unwrap(); + + if opclo_30m_vec.len() >= 3 && supertrend_vec.len() >= 3 { + let supertrend_search_result = supertrend_vec.binary_search_by_key( + &opclo_30m_vec.last().unwrap().close_time, + |SupertrendData { + band_value, + signal, + area, + close_time, + }| *close_time, + ); + if supertrend_search_result.is_ok() { + if supertrend_vec[supertrend_search_result.unwrap()] + .area + .contains("DOWN") + { + let mut filtered_2nd_symbols_lock = filtered_2nd_symbols_arc_c.lock().await; + filtered_2nd_symbols_lock + .push((symbol.clone(), opclo_30m_vec.last().unwrap().close_time)); + } } } } - } + })); } + try_join_all(task_vec).await?; // 2nd filtering: lookup tables if the tradepair is already there let inspect_table_name_1 = String::from("buy_ordered_coin_list"); let inspect_table_name_2 = String::from("sell_ordered_coin_list"); let inspect_table_name_3 = String::from("pre_suggested_coin_list"); let inspect_table_name_4 = String::from("suggested_coin_list"); - let mut filtered_3rd_symbols: Vec<(String, i64)> = Vec::new(); // (symbol, closetime) - for element in filtered_2nd_symbols { + let mut filtered_3rd_symbols: Vec<(String, i64)> = Vec::new(); + let mut filtered_3rd_symbols_arc: Arc>> = Arc::new(Mutex::new(filtered_3rd_symbols)); // (symbol, closetime) + let mut task_vec = Vec::new(); + + let filtered_2nd_iter = filtered_2nd_symbols_arc.lock().await.clone().into_iter(); + for element in filtered_2nd_iter { let mut exists_condition_build = String::from("symbol=\'"); exists_condition_build.push_str(element.0.as_str()); exists_condition_build.push_str("\' AND close_time="); exists_condition_build.push_str(element.1.to_string().as_str()); let exists_condition = Some(exists_condition_build); + let exists_condition_c = exists_condition.clone(); + let inspect_table_name_1_c = inspect_table_name_1.clone(); + let inspect_table_name_2_c = inspect_table_name_2.clone(); + let inspect_table_name_3_c = inspect_table_name_3.clone(); + let inspect_table_name_4_c = inspect_table_name_4.clone(); + let element_c = element.clone(); + let filtered_3rd_symbols_arc_c = Arc::clone(&filtered_3rd_symbols_arc); + task_vec.push(tokio::spawn(async move { + let inspect_result_1 = exists_record(&inspect_table_name_1_c, &exists_condition_c).await; + let inspect_result_2 = exists_record(&inspect_table_name_2_c, &exists_condition_c).await; + let inspect_result_3 = exists_record(&inspect_table_name_3_c, &exists_condition_c).await; + let inspect_result_4 = exists_record(&inspect_table_name_4_c, &exists_condition_c).await; - let inspect_result_1 = exists_record(&inspect_table_name_1, &exists_condition).await; - let inspect_result_2 = exists_record(&inspect_table_name_2, &exists_condition).await; - let inspect_result_3 = exists_record(&inspect_table_name_3, &exists_condition).await; - let inspect_result_4 = exists_record(&inspect_table_name_4, &exists_condition).await; - - if inspect_result_1 == false - && inspect_result_2 == false - && inspect_result_3 == false - && inspect_result_4 == false - { - filtered_3rd_symbols.push(element); - } + if inspect_result_1 == false + && inspect_result_2 == false + && inspect_result_3 == false + && inspect_result_4 == false + { + let mut filtered_3rd_symbols_lock = filtered_3rd_symbols_arc_c.lock().await; + filtered_3rd_symbols_lock + .push(element_c); + } + })); } + try_join_all(task_vec).await?; // 3rd filtering: BollingerBand (length 10, stddev: 3.0, 30m close price) the current price should be under the lowerband of BB. - let sma_30m_data: Vec<(String, Vec)> = value_estimation_team::indicators::sma::sma( - 30, + let filtered_3rd_iter = filtered_3rd_symbols_arc.lock().await.clone().into_iter(); + let filtered_3rd_symbols_mutex = filtered_3rd_symbols_arc.lock().await; + let sma10_30m_data: Vec<(String, Vec)> = value_estimation_team::indicators::sma::sma( + 10, &alldata.rt_price_30m_vec, - &alldata.valid_symbol_vec, + &filtered_3rd_symbols_mutex, + ) + .await?; + + let bb10_30m_data: Vec<(String, Vec)> = value_estimation_team::indicators::bollingerband::bollingerband( + 10, + 3.0, + &sma10_30m_data, + &alldata.rt_price_30m_vec, + &filtered_3rd_symbols_mutex, ) .await?; - let bb10_30m_data: Vec<(String, Vec)> = - value_estimation_team::indicators::bollingerband::bollingerband( - 10, - 3.0, - &sma_30m_data, - &alldata.rt_price_1m_vec, - &alldata.valid_symbol_vec, - ) - .await?; - let mut bb10_30m_vec: Vec = Vec::new(); - let mut filtered_4th_symbols: Vec<(String, i64)> = Vec::new(); // (symbol, closetime) - for element in filtered_3rd_symbols { + let mut task_vec = Vec::new(); + let mut filtered_4th_symbols: Vec<(String, i64)> = Vec::new(); + let mut filtered_4th_symbols_arc: Arc>> = Arc::new(Mutex::new(filtered_4th_symbols)); // (symbol, closetime) + for element in filtered_3rd_iter { + let mut bb10_30m_vec: Vec = Vec::new(); let bb10_30m_option = bb10_30m_data.iter().position(|x| *x.0 == element.0); - - if bb10_30m_option.is_some() { - bb10_30m_vec = bb10_30m_data[bb10_30m_option.unwrap()].1.clone(); - + let bb10_30m_option_c = bb10_30m_option.clone(); + let element_c = element.clone(); + let filtered_4th_symbols_arc_c = Arc::clone(&filtered_4th_symbols_arc); + + if bb10_30m_option_c.is_some() { + bb10_30m_vec = bb10_30m_data[bb10_30m_option_c.unwrap()].1.clone(); + if bb10_30m_vec.len() >= 3 { - let bb_search_result = bb10_30m_vec.binary_search_by_key( - &element.1, - |&BollingerBandData { - sma, - upperband, - lowerband, - close_time, - }| close_time, - ); - if bb_search_result.is_ok() { - let current_price = get_current_price(&element.0, &alldata.price_vec) - .await - .unwrap(); - if bb10_30m_vec[bb_search_result.unwrap()].lowerband > current_price { - filtered_4th_symbols.push(element); + let bb10_30m_vec_c = bb10_30m_vec.clone(); + let current_price = get_current_price(&element_c.0, &alldata.price_vec) + .await + .unwrap(); + task_vec.push(tokio::spawn(async move { + let bb_search_result = bb10_30m_vec_c.binary_search_by_key( + &element_c.1, + |&BollingerBandData { + sma, + upperband, + lowerband, + close_time, + }| close_time, + ); + if bb_search_result.is_ok() { + if bb10_30m_vec_c[bb_search_result.unwrap()].lowerband > current_price { + let mut filtered_4th_symbols_lock = filtered_4th_symbols_arc_c.lock().await; + filtered_4th_symbols_lock + .push(element_c); + } } - } + })); } } } - + try_join_all(task_vec).await?; + // 4th filtering: RSI (length: 10, 30m close price) the current index should be lower than 30. + let filtered_4th_iter = filtered_4th_symbols_arc.lock().await.clone().into_iter(); let mut rsi10_30m_data: Vec<(String, Vec)> = Vec::new(); value_estimation_team::indicators::rsi::rsi( 10, @@ -1054,7 +1095,7 @@ pub async fn execute_strategist_for_test_temp( .await?; let mut rsi10_30m_vec: Vec = Vec::new(); let mut filtered_5th_symbols: Vec<(String, i64)> = Vec::new(); // (symbol, closetime) - for element in filtered_4th_symbols { + for element in filtered_4th_iter { let rsi10_30m_option = rsi10_30m_data.iter().position(|x| *x.0 == element.0); if rsi10_30m_option.is_some() { @@ -1076,23 +1117,23 @@ pub async fn execute_strategist_for_test_temp( } } } - + // 5th filtering: heatmap volume(MA length 10, std length 10, 30m close price), the current candle should be over than high at least. let mut filtered_6th_symbols: Vec<(String, i64)> = Vec::new(); // (symbol, closetime) for element in filtered_5th_symbols { - let opclo_30m_option = alldata - .rt_price_30m_vec + let rt_price_30m_vec_c = alldata.rt_price_30m_vec.clone(); + let opclo_30m_option = rt_price_30m_vec_c .iter() .position(|x| *x.0 == element.0); if opclo_30m_option.is_some() { - opclo_30m_vec = alldata.rt_price_30m_vec[opclo_30m_option.unwrap()] + let opclo_30m_vec = rt_price_30m_vec_c[opclo_30m_option.unwrap()] .1 .clone(); if opclo_30m_vec.len() >= 3 { let heatmap_volume_option = heatmap_volume( &element.0, - &alldata.rt_price_30m_vec, + &rt_price_30m_vec_c, 10, 10, 4.0, @@ -1126,7 +1167,7 @@ pub async fn execute_strategist_for_test_temp( } } } - + // 6th filtering condition: MACD // let mut opclo_30m_vec: Vec = Vec::new(); // let mut ema3_1d_vec: &Vec = &Vec::new(); @@ -1154,7 +1195,7 @@ pub async fn execute_strategist_for_test_temp( // } // } - insert_pre_suggested_coins(1, &filtered_6th_symbols, alldata).await; + // insert_pre_suggested_coins(1, &filtered_6th_symbols, alldata).await; Ok(()) }