Apply threading for filterings

This commit is contained in:
Sik Yoon 2023-07-22 20:27:36 +09:00
parent 5f8f613042
commit 95c7dd4b23

View File

@ -14,13 +14,15 @@ use crate::value_estimation_team::indicators::rsi::RsiData;
use crate::value_estimation_team::indicators::sma::SmaData;
use crate::value_estimation_team::indicators::stoch_rsi::{StochRsiDData, StochRsiKData};
use crate::value_estimation_team::indicators::supertrend::{supertrend, SupertrendData};
use futures::future::try_join_all;
use csv::{DeserializeRecordsIter, StringRecord};
use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
use serde::Deserialize;
use sqlx::FromRow;
use std::cmp::Ordering;
use std::{cmp::Ordering, sync::Arc};
use tokio::time::{sleep, Duration, Instant};
use tokio::sync::Mutex;
use crate::signal_association::signal_decision::*;
@ -31,6 +33,7 @@ pub enum MA {
Tema,
}
#[derive(Debug)]
pub struct AllData {
pub price_vec: Vec<CoinPriceData>,
pub valid_symbol_vec: Vec<String>,
@ -927,123 +930,161 @@ pub async fn execute_strategists(
pub async fn execute_strategist_for_test_temp(
alldata: &AllData,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// 1st filtering: supertrend(ATR period 10, multiplier: 1.3, 30m close price), the area should be in SELL area.
let mut filtered_2nd_symbols: Vec<(String, i64)> = Vec::new(); // (symbol, closetime)
let mut opclo_30m_vec: Vec<RealtimePriceData> = Vec::new();
let mut supertrend_vec: Vec<SupertrendData> = Vec::new();
for symbol in &alldata.valid_symbol_vec {
let opclo_30m_option = alldata
.rt_price_30m_vec
.iter()
.position(|x| *x.0 == *symbol);
let supertrend_option_30m =
supertrend(&symbol, &alldata.rt_price_30m_vec, 10, 1.3, true).await;
let mut filtered_2nd_symbols: Vec<(String, i64)> = Vec::new();
let mut filtered_2nd_symbols_arc: Arc<Mutex<Vec<(String, i64)>>> = Arc::new(Mutex::new(filtered_2nd_symbols)); // (symbol, closetime)
let mut task_vec = Vec::new();
let valid_symbol_vec_c = alldata.valid_symbol_vec.clone();
for symbol in valid_symbol_vec_c {
let mut opclo_30m_vec: Vec<RealtimePriceData> = Vec::new();
let mut supertrend_vec: Vec<SupertrendData> = Vec::new();
let rt_price_30m_vec_c = alldata.rt_price_30m_vec.clone();
let filtered_2nd_symbols_arc_c = Arc::clone(&filtered_2nd_symbols_arc);
task_vec.push(tokio::spawn(async move {
let opclo_30m_option = rt_price_30m_vec_c.iter()
.position(|x| *x.0 == symbol);
let supertrend_option_30m =
supertrend(&symbol, &rt_price_30m_vec_c, 10, 1.3, true).await;
if opclo_30m_option.is_some() && supertrend_option_30m.is_some() {
opclo_30m_vec = alldata.rt_price_30m_vec[opclo_30m_option.unwrap()]
.1
.clone();
supertrend_vec = supertrend_option_30m.unwrap();
if opclo_30m_option.is_some() && supertrend_option_30m.is_some() {
opclo_30m_vec = rt_price_30m_vec_c[opclo_30m_option.unwrap()]
.1
.clone();
supertrend_vec = supertrend_option_30m.unwrap();
if opclo_30m_vec.len() >= 3 && supertrend_vec.len() >= 3 {
let supertrend_search_result = supertrend_vec.binary_search_by_key(
&opclo_30m_vec.last().unwrap().close_time,
|SupertrendData {
band_value,
signal,
area,
close_time,
}| *close_time,
);
if supertrend_search_result.is_ok() {
if supertrend_vec[supertrend_search_result.unwrap()]
.area
.contains("DOWN")
{
filtered_2nd_symbols
.push((symbol.clone(), opclo_30m_vec.last().unwrap().close_time));
if opclo_30m_vec.len() >= 3 && supertrend_vec.len() >= 3 {
let supertrend_search_result = supertrend_vec.binary_search_by_key(
&opclo_30m_vec.last().unwrap().close_time,
|SupertrendData {
band_value,
signal,
area,
close_time,
}| *close_time,
);
if supertrend_search_result.is_ok() {
if supertrend_vec[supertrend_search_result.unwrap()]
.area
.contains("DOWN")
{
let mut filtered_2nd_symbols_lock = filtered_2nd_symbols_arc_c.lock().await;
filtered_2nd_symbols_lock
.push((symbol.clone(), opclo_30m_vec.last().unwrap().close_time));
}
}
}
}
}
}));
}
try_join_all(task_vec).await?;
// 2nd filtering: lookup tables if the tradepair is already there
let inspect_table_name_1 = String::from("buy_ordered_coin_list");
let inspect_table_name_2 = String::from("sell_ordered_coin_list");
let inspect_table_name_3 = String::from("pre_suggested_coin_list");
let inspect_table_name_4 = String::from("suggested_coin_list");
let mut filtered_3rd_symbols: Vec<(String, i64)> = Vec::new(); // (symbol, closetime)
for element in filtered_2nd_symbols {
let mut filtered_3rd_symbols: Vec<(String, i64)> = Vec::new();
let mut filtered_3rd_symbols_arc: Arc<Mutex<Vec<(String, i64)>>> = Arc::new(Mutex::new(filtered_3rd_symbols)); // (symbol, closetime)
let mut task_vec = Vec::new();
let filtered_2nd_iter = filtered_2nd_symbols_arc.lock().await.clone().into_iter();
for element in filtered_2nd_iter {
let mut exists_condition_build = String::from("symbol=\'");
exists_condition_build.push_str(element.0.as_str());
exists_condition_build.push_str("\' AND close_time=");
exists_condition_build.push_str(element.1.to_string().as_str());
let exists_condition = Some(exists_condition_build);
let exists_condition_c = exists_condition.clone();
let inspect_table_name_1_c = inspect_table_name_1.clone();
let inspect_table_name_2_c = inspect_table_name_2.clone();
let inspect_table_name_3_c = inspect_table_name_3.clone();
let inspect_table_name_4_c = inspect_table_name_4.clone();
let element_c = element.clone();
let filtered_3rd_symbols_arc_c = Arc::clone(&filtered_3rd_symbols_arc);
task_vec.push(tokio::spawn(async move {
let inspect_result_1 = exists_record(&inspect_table_name_1_c, &exists_condition_c).await;
let inspect_result_2 = exists_record(&inspect_table_name_2_c, &exists_condition_c).await;
let inspect_result_3 = exists_record(&inspect_table_name_3_c, &exists_condition_c).await;
let inspect_result_4 = exists_record(&inspect_table_name_4_c, &exists_condition_c).await;
let inspect_result_1 = exists_record(&inspect_table_name_1, &exists_condition).await;
let inspect_result_2 = exists_record(&inspect_table_name_2, &exists_condition).await;
let inspect_result_3 = exists_record(&inspect_table_name_3, &exists_condition).await;
let inspect_result_4 = exists_record(&inspect_table_name_4, &exists_condition).await;
if inspect_result_1 == false
&& inspect_result_2 == false
&& inspect_result_3 == false
&& inspect_result_4 == false
{
filtered_3rd_symbols.push(element);
}
if inspect_result_1 == false
&& inspect_result_2 == false
&& inspect_result_3 == false
&& inspect_result_4 == false
{
let mut filtered_3rd_symbols_lock = filtered_3rd_symbols_arc_c.lock().await;
filtered_3rd_symbols_lock
.push(element_c);
}
}));
}
try_join_all(task_vec).await?;
// 3rd filtering: BollingerBand (length 10, stddev: 3.0, 30m close price) the current price should be under the lowerband of BB.
let sma_30m_data: Vec<(String, Vec<SmaData>)> = value_estimation_team::indicators::sma::sma(
30,
let filtered_3rd_iter = filtered_3rd_symbols_arc.lock().await.clone().into_iter();
let filtered_3rd_symbols_mutex = filtered_3rd_symbols_arc.lock().await;
let sma10_30m_data: Vec<(String, Vec<SmaData>)> = value_estimation_team::indicators::sma::sma(
10,
&alldata.rt_price_30m_vec,
&alldata.valid_symbol_vec,
&filtered_3rd_symbols_mutex,
)
.await?;
let bb10_30m_data: Vec<(String, Vec<BollingerBandData>)> =
value_estimation_team::indicators::bollingerband::bollingerband(
10,
3.0,
&sma_30m_data,
&alldata.rt_price_1m_vec,
&alldata.valid_symbol_vec,
)
.await?;
let mut bb10_30m_vec: Vec<BollingerBandData> = Vec::new();
let mut filtered_4th_symbols: Vec<(String, i64)> = Vec::new(); // (symbol, closetime)
for element in filtered_3rd_symbols {
let bb10_30m_option = bb10_30m_data.iter().position(|x| *x.0 == element.0);
let bb10_30m_data: Vec<(String, Vec<BollingerBandData>)> = value_estimation_team::indicators::bollingerband::bollingerband(
10,
3.0,
&sma10_30m_data,
&alldata.rt_price_30m_vec,
&filtered_3rd_symbols_mutex,
)
.await?;
if bb10_30m_option.is_some() {
bb10_30m_vec = bb10_30m_data[bb10_30m_option.unwrap()].1.clone();
let mut task_vec = Vec::new();
let mut filtered_4th_symbols: Vec<(String, i64)> = Vec::new();
let mut filtered_4th_symbols_arc: Arc<Mutex<Vec<(String, i64)>>> = Arc::new(Mutex::new(filtered_4th_symbols)); // (symbol, closetime)
for element in filtered_3rd_iter {
let mut bb10_30m_vec: Vec<BollingerBandData> = Vec::new();
let bb10_30m_option = bb10_30m_data.iter().position(|x| *x.0 == element.0);
let bb10_30m_option_c = bb10_30m_option.clone();
let element_c = element.clone();
let filtered_4th_symbols_arc_c = Arc::clone(&filtered_4th_symbols_arc);
if bb10_30m_option_c.is_some() {
bb10_30m_vec = bb10_30m_data[bb10_30m_option_c.unwrap()].1.clone();
if bb10_30m_vec.len() >= 3 {
let bb_search_result = bb10_30m_vec.binary_search_by_key(
&element.1,
|&BollingerBandData {
sma,
upperband,
lowerband,
close_time,
}| close_time,
);
if bb_search_result.is_ok() {
let current_price = get_current_price(&element.0, &alldata.price_vec)
.await
.unwrap();
if bb10_30m_vec[bb_search_result.unwrap()].lowerband > current_price {
filtered_4th_symbols.push(element);
let bb10_30m_vec_c = bb10_30m_vec.clone();
let current_price = get_current_price(&element_c.0, &alldata.price_vec)
.await
.unwrap();
task_vec.push(tokio::spawn(async move {
let bb_search_result = bb10_30m_vec_c.binary_search_by_key(
&element_c.1,
|&BollingerBandData {
sma,
upperband,
lowerband,
close_time,
}| close_time,
);
if bb_search_result.is_ok() {
if bb10_30m_vec_c[bb_search_result.unwrap()].lowerband > current_price {
let mut filtered_4th_symbols_lock = filtered_4th_symbols_arc_c.lock().await;
filtered_4th_symbols_lock
.push(element_c);
}
}
}
}));
}
}
}
try_join_all(task_vec).await?;
// 4th filtering: RSI (length: 10, 30m close price) the current index should be lower than 30.
let filtered_4th_iter = filtered_4th_symbols_arc.lock().await.clone().into_iter();
let mut rsi10_30m_data: Vec<(String, Vec<RsiData>)> = Vec::new();
value_estimation_team::indicators::rsi::rsi(
10,
@ -1054,7 +1095,7 @@ pub async fn execute_strategist_for_test_temp(
.await?;
let mut rsi10_30m_vec: Vec<RsiData> = Vec::new();
let mut filtered_5th_symbols: Vec<(String, i64)> = Vec::new(); // (symbol, closetime)
for element in filtered_4th_symbols {
for element in filtered_4th_iter {
let rsi10_30m_option = rsi10_30m_data.iter().position(|x| *x.0 == element.0);
if rsi10_30m_option.is_some() {
@ -1080,19 +1121,19 @@ pub async fn execute_strategist_for_test_temp(
// 5th filtering: heatmap volume(MA length 10, std length 10, 30m close price), the current candle should be over than high at least.
let mut filtered_6th_symbols: Vec<(String, i64)> = Vec::new(); // (symbol, closetime)
for element in filtered_5th_symbols {
let opclo_30m_option = alldata
.rt_price_30m_vec
let rt_price_30m_vec_c = alldata.rt_price_30m_vec.clone();
let opclo_30m_option = rt_price_30m_vec_c
.iter()
.position(|x| *x.0 == element.0);
if opclo_30m_option.is_some() {
opclo_30m_vec = alldata.rt_price_30m_vec[opclo_30m_option.unwrap()]
let opclo_30m_vec = rt_price_30m_vec_c[opclo_30m_option.unwrap()]
.1
.clone();
if opclo_30m_vec.len() >= 3 {
let heatmap_volume_option = heatmap_volume(
&element.0,
&alldata.rt_price_30m_vec,
&rt_price_30m_vec_c,
10,
10,
4.0,
@ -1154,7 +1195,7 @@ pub async fn execute_strategist_for_test_temp(
// }
// }
insert_pre_suggested_coins(1, &filtered_6th_symbols, alldata).await;
// insert_pre_suggested_coins(1, &filtered_6th_symbols, alldata).await;
Ok(())
}