diff --git a/src/main.rs b/src/main.rs index 179bfba..cedbcb5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -119,7 +119,6 @@ async fn main() -> Result<(), Box> { // price per second data and channels let mut price_vec: Vec = Vec::new(); // (symbol, price) let (tx_price_vec, mut rx_price_vec) = watch::channel(price_vec); - let mut rx2_price_vec = rx_price_vec.clone(); let mut rx3_price_vec = rx_price_vec.clone(); let mut rx4_price_vec = rx_price_vec.clone(); let mut rx5_price_vec = rx_price_vec.clone(); @@ -791,6 +790,7 @@ async fn main() -> Result<(), Box> { // Task#4: price per second tokio::task::spawn(async move { + sleep(Duration::from_secs(20)).await; let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(1000)) .build() @@ -799,36 +799,126 @@ async fn main() -> Result<(), Box> { let mut elapsed_time = 0; loop { let instant = Instant::now(); - // let tx1_changed = rx1_7.changed().await; - // server_epoch = *rx1_7.borrow(); - // match tx1_changed { - // Ok(T) => { - // let mut price_vec_temp: Vec = Vec::new(); - // let result = request_others::request_all_coin_price(&client, &mut price_vec_temp).await; - - // match result { - // Ok(T) => { - // tx_price_vec.send_modify(|vec| *vec = price_vec_temp); - // tx_task4.send(4).expect("The mpsc channel has been closed."); - // } - // Err(E) => {} - // } - - // // monitors::monitoring_all_coin_profit_change().await; - // } - // Err(e) => {} - // } let mut price_vec_temp: Vec = Vec::new(); let result = request_others::request_all_coin_price(&client, &mut price_vec_temp).await; - + let mut price_vec_temp_c: Vec = Vec::new(); match result { Ok(T) => { + price_vec_temp_c = price_vec_temp.clone(); tx_price_vec.send_modify(|vec| *vec = price_vec_temp); tx_task4.send(4).expect("The mpsc channel has been closed."); } Err(E) => {} } + + // Task#10: make realtime price data of 1m, 30m, 1d, 1w and 1mon + let valid_usdt_trade_vec = rx_valid_usdt_trade_vec.borrow().clone(); + // 1m + let interval = String::from("1m"); + let candle_1m_vec = rx_candle_1m_vec.borrow().clone(); + let rt_price_1m_vec_read_temp: Vec<(String, Vec)> = + Vec::new(); + let mut rt_price_1m_vec_write_temp: Vec<(String, Vec)> = + Vec::new(); + let result = + value_estimation_team::datapoints::price_data::update_realtime_price_data( + &interval, + &candle_1m_vec, + &rt_price_1m_vec_read_temp, + &mut rt_price_1m_vec_write_temp, + &price_vec_temp_c, + &valid_usdt_trade_vec, + ) + .await; + + match result { + Ok(T) => { + tx_rt_price_1m_vec.send_modify(|vec| *vec = rt_price_1m_vec_write_temp); + tx_task10 + .send(10) + .expect("The mpsc channel has been closed."); + } + Err(E) => {} + } + + // 30m + let interval = String::from("30m"); + let candle_30m_vec = rx_candle_30m_vec.borrow().clone(); + let rt_price_1m_vec = rx_rt_price_1m_vec.borrow().clone(); + let mut rt_price_30m_vec_write_temp: Vec<(String, Vec)> = + Vec::new(); + + if !rt_price_1m_vec.is_empty() { + let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_30m_vec, &rt_price_1m_vec, &mut rt_price_30m_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await; + + match result { + Ok(T) => { + tx_rt_price_30m_vec + .send_modify(|vec| *vec = rt_price_30m_vec_write_temp); + tx_task10 + .send(10) + .expect("The mpsc channel has been closed."); + } + Err(E) => {} + } + } + + // 1d + let interval = String::from("1d"); + let candle_1d_vec = rx_candle_1d_vec.borrow().clone(); + let rt_price_30m_vec = rx_rt_price_30m_vec.borrow().clone(); + let mut rt_price_1d_vec_write_temp: Vec<(String, Vec)> = + Vec::new(); + + if !rt_price_30m_vec.is_empty() { + let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1d_vec, &rt_price_30m_vec, &mut rt_price_1d_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await; + + match result { + Ok(T) => { + tx_rt_price_1d_vec + .send_modify(|vec| *vec = rt_price_1d_vec_write_temp); + tx_task10 + .send(10) + .expect("The mpsc channel has been closed."); + } + Err(E) => {} + } + } + + // { // 1w + // let interval = String::from("1w"); + // let candle_1w_vec = rx_candle_1w_vec.borrow().clone(); + // let rt_price_1d_vec = rx_rt_price_1d_vec.borrow().clone(); + // let mut rt_price_1w_vec_write_temp: Vec<(String, Vec)> = Vec::new(); + // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1w_vec, &rt_price_1d_vec, &mut rt_price_1w_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await; + + // match result { + // Ok(T) => { + // tx_rt_price_1w_vec.send_modify(|vec| *vec = rt_price_1w_vec_write_temp); + // tx_task10.send(10).expect("The mpsc channel has been closed."); + // } + // Err(E) => {} + // } + // } + + // { // 1mon + // let interval = String::from("1mon"); + // let candle_1mon_vec = rx_candle_1mon_vec.borrow().clone(); + // let rt_price_1w_vec = rx_rt_price_1w_vec.borrow().clone(); + // let mut rt_price_1mon_vec_write_temp: Vec<(String, Vec)> = Vec::new(); + // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1mon_vec, &rt_price_1w_vec, &mut rt_price_1mon_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await; + + // match result { + // Ok(T) => { + // tx_rt_price_1mon_vec.send_modify(|vec| *vec = rt_price_1mon_vec_write_temp); + // tx_task10.send(10).expect("The mpsc channel has been closed."); + // } + // Err(E) => {} + // } + // } + + // sleep as much as the loop recurs per 1 seconds if all operation finished within 1 seconds. elapsed_time = instant.elapsed().as_millis(); if 500 > elapsed_time { @@ -951,134 +1041,134 @@ async fn main() -> Result<(), Box> { // }); // Task#10: make realtime price data of 1m, 30m, 1d, 1w and 1mon - tokio::task::spawn(async move { - let mut elapsed_time = 0; - if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL { - sleep(Duration::from_secs(30)).await; - } else { - sleep(Duration::from_secs(5)).await; - } + // tokio::task::spawn(async move { + // let mut elapsed_time = 0; + // if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL { + // sleep(Duration::from_secs(30)).await; + // } else { + // sleep(Duration::from_secs(5)).await; + // } - loop { - let instant = Instant::now(); - let price_changed = rx_price_vec.changed().await; - if price_changed.is_ok() { - let price_vec = rx_price_vec.borrow().clone(); - let valid_usdt_trade_vec = rx_valid_usdt_trade_vec.borrow().clone(); + // loop { + // let instant = Instant::now(); + // let price_changed = rx_price_vec.changed().await; + // if price_changed.is_ok() { + // let price_vec = rx_price_vec.borrow().clone(); + // let valid_usdt_trade_vec = rx_valid_usdt_trade_vec.borrow().clone(); - if !price_vec.is_empty() { - // 1m - let interval = String::from("1m"); - let candle_1m_vec = rx_candle_1m_vec.borrow().clone(); - let rt_price_1m_vec_read_temp: Vec<(String, Vec)> = - Vec::new(); - let mut rt_price_1m_vec_write_temp: Vec<(String, Vec)> = - Vec::new(); - let result = - value_estimation_team::datapoints::price_data::update_realtime_price_data( - &interval, - &candle_1m_vec, - &rt_price_1m_vec_read_temp, - &mut rt_price_1m_vec_write_temp, - &price_vec, - &valid_usdt_trade_vec, - ) - .await; + // if !price_vec.is_empty() { + // // 1m + // let interval = String::from("1m"); + // let candle_1m_vec = rx_candle_1m_vec.borrow().clone(); + // let rt_price_1m_vec_read_temp: Vec<(String, Vec)> = + // Vec::new(); + // let mut rt_price_1m_vec_write_temp: Vec<(String, Vec)> = + // Vec::new(); + // let result = + // value_estimation_team::datapoints::price_data::update_realtime_price_data( + // &interval, + // &candle_1m_vec, + // &rt_price_1m_vec_read_temp, + // &mut rt_price_1m_vec_write_temp, + // &price_vec, + // &valid_usdt_trade_vec, + // ) + // .await; - match result { - Ok(T) => { - tx_rt_price_1m_vec.send_modify(|vec| *vec = rt_price_1m_vec_write_temp); - tx_task10 - .send(10) - .expect("The mpsc channel has been closed."); - } - Err(E) => {} - } + // match result { + // Ok(T) => { + // tx_rt_price_1m_vec.send_modify(|vec| *vec = rt_price_1m_vec_write_temp); + // tx_task10 + // .send(10) + // .expect("The mpsc channel has been closed."); + // } + // Err(E) => {} + // } - // 30m - let interval = String::from("30m"); - let candle_30m_vec = rx_candle_30m_vec.borrow().clone(); - let rt_price_1m_vec = rx_rt_price_1m_vec.borrow().clone(); - let mut rt_price_30m_vec_write_temp: Vec<(String, Vec)> = - Vec::new(); + // // 30m + // let interval = String::from("30m"); + // let candle_30m_vec = rx_candle_30m_vec.borrow().clone(); + // let rt_price_1m_vec = rx_rt_price_1m_vec.borrow().clone(); + // let mut rt_price_30m_vec_write_temp: Vec<(String, Vec)> = + // Vec::new(); - if !rt_price_1m_vec.is_empty() { - let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_30m_vec, &rt_price_1m_vec, &mut rt_price_30m_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await; + // if !rt_price_1m_vec.is_empty() { + // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_30m_vec, &rt_price_1m_vec, &mut rt_price_30m_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await; - match result { - Ok(T) => { - tx_rt_price_30m_vec - .send_modify(|vec| *vec = rt_price_30m_vec_write_temp); - tx_task10 - .send(10) - .expect("The mpsc channel has been closed."); - } - Err(E) => {} - } - } + // match result { + // Ok(T) => { + // tx_rt_price_30m_vec + // .send_modify(|vec| *vec = rt_price_30m_vec_write_temp); + // tx_task10 + // .send(10) + // .expect("The mpsc channel has been closed."); + // } + // Err(E) => {} + // } + // } - // 1d - let interval = String::from("1d"); - let candle_1d_vec = rx_candle_1d_vec.borrow().clone(); - let rt_price_30m_vec = rx_rt_price_30m_vec.borrow().clone(); - let mut rt_price_1d_vec_write_temp: Vec<(String, Vec)> = - Vec::new(); + // // 1d + // let interval = String::from("1d"); + // let candle_1d_vec = rx_candle_1d_vec.borrow().clone(); + // let rt_price_30m_vec = rx_rt_price_30m_vec.borrow().clone(); + // let mut rt_price_1d_vec_write_temp: Vec<(String, Vec)> = + // Vec::new(); - if !rt_price_30m_vec.is_empty() { - let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1d_vec, &rt_price_30m_vec, &mut rt_price_1d_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await; + // if !rt_price_30m_vec.is_empty() { + // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1d_vec, &rt_price_30m_vec, &mut rt_price_1d_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await; - match result { - Ok(T) => { - tx_rt_price_1d_vec - .send_modify(|vec| *vec = rt_price_1d_vec_write_temp); - tx_task10 - .send(10) - .expect("The mpsc channel has been closed."); - } - Err(E) => {} - } - } + // match result { + // Ok(T) => { + // tx_rt_price_1d_vec + // .send_modify(|vec| *vec = rt_price_1d_vec_write_temp); + // tx_task10 + // .send(10) + // .expect("The mpsc channel has been closed."); + // } + // Err(E) => {} + // } + // } - // { // 1w - // let interval = String::from("1w"); - // let candle_1w_vec = rx_candle_1w_vec.borrow().clone(); - // let rt_price_1d_vec = rx_rt_price_1d_vec.borrow().clone(); - // let mut rt_price_1w_vec_write_temp: Vec<(String, Vec)> = Vec::new(); - // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1w_vec, &rt_price_1d_vec, &mut rt_price_1w_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await; + // // { // 1w + // // let interval = String::from("1w"); + // // let candle_1w_vec = rx_candle_1w_vec.borrow().clone(); + // // let rt_price_1d_vec = rx_rt_price_1d_vec.borrow().clone(); + // // let mut rt_price_1w_vec_write_temp: Vec<(String, Vec)> = Vec::new(); + // // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1w_vec, &rt_price_1d_vec, &mut rt_price_1w_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await; - // match result { - // Ok(T) => { - // tx_rt_price_1w_vec.send_modify(|vec| *vec = rt_price_1w_vec_write_temp); - // tx_task10.send(10).expect("The mpsc channel has been closed."); - // } - // Err(E) => {} - // } - // } + // // match result { + // // Ok(T) => { + // // tx_rt_price_1w_vec.send_modify(|vec| *vec = rt_price_1w_vec_write_temp); + // // tx_task10.send(10).expect("The mpsc channel has been closed."); + // // } + // // Err(E) => {} + // // } + // // } - // { // 1mon - // let interval = String::from("1mon"); - // let candle_1mon_vec = rx_candle_1mon_vec.borrow().clone(); - // let rt_price_1w_vec = rx_rt_price_1w_vec.borrow().clone(); - // let mut rt_price_1mon_vec_write_temp: Vec<(String, Vec)> = Vec::new(); - // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1mon_vec, &rt_price_1w_vec, &mut rt_price_1mon_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await; + // // { // 1mon + // // let interval = String::from("1mon"); + // // let candle_1mon_vec = rx_candle_1mon_vec.borrow().clone(); + // // let rt_price_1w_vec = rx_rt_price_1w_vec.borrow().clone(); + // // let mut rt_price_1mon_vec_write_temp: Vec<(String, Vec)> = Vec::new(); + // // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1mon_vec, &rt_price_1w_vec, &mut rt_price_1mon_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await; - // match result { - // Ok(T) => { - // tx_rt_price_1mon_vec.send_modify(|vec| *vec = rt_price_1mon_vec_write_temp); - // tx_task10.send(10).expect("The mpsc channel has been closed."); - // } - // Err(E) => {} - // } - // } - } - } - // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. - elapsed_time = instant.elapsed().as_millis(); - if 1_000 > elapsed_time { - sleep(Duration::from_millis((1_000 - elapsed_time) as u64)).await; - } - } - }); + // // match result { + // // Ok(T) => { + // // tx_rt_price_1mon_vec.send_modify(|vec| *vec = rt_price_1mon_vec_write_temp); + // // tx_task10.send(10).expect("The mpsc channel has been closed."); + // // } + // // Err(E) => {} + // // } + // // } + // } + // } + // // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. + // elapsed_time = instant.elapsed().as_millis(); + // if 250 > elapsed_time { + // sleep(Duration::from_millis((250 - elapsed_time) as u64)).await; + // } + // } + // }); // Task#11 // SMA, EMA, RSI, Stoch RSI, BollingerBand (3, 10, 30) for candle 1m, 30m, 1d, 1w, and 1mon @@ -1684,7 +1774,6 @@ async fn main() -> Result<(), Box> { let mut elapsed_time = 0; loop { let instant = Instant::now(); - all_data.price_vec = rx2_price_vec.borrow().clone(); all_data.valid_symbol_vec = rx3_valid_usdt_trade_vec.borrow().clone(); // realtime price data @@ -1776,7 +1865,9 @@ async fn main() -> Result<(), Box> { .send(18) .expect("The mpsc channel has been closed."); } - Err(E) => {} + Err(E) => { + println!("Task #18 Error"); + } } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. @@ -1793,8 +1884,6 @@ async fn main() -> Result<(), Box> { let mut all_data = AllData::new(); loop { let instant = Instant::now(); - - all_data.price_vec = rx2_price_vec.borrow().clone(); all_data.valid_symbol_vec = rx3_valid_usdt_trade_vec.borrow().clone(); // realtime price data all_data.rt_price_1m_vec = rx3_rt_price_1m_vec.borrow().clone();