Move Task#10 into task#4

This commit is contained in:
Sik Yoon 2023-08-02 23:56:45 +09:00
parent fdc7c887e5
commit dd37415e1d

View File

@ -119,7 +119,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// price per second data and channels // price per second data and channels
let mut price_vec: Vec<CoinPriceData> = Vec::new(); // (symbol, price) let mut price_vec: Vec<CoinPriceData> = Vec::new(); // (symbol, price)
let (tx_price_vec, mut rx_price_vec) = watch::channel(price_vec); let (tx_price_vec, mut rx_price_vec) = watch::channel(price_vec);
let mut rx2_price_vec = rx_price_vec.clone();
let mut rx3_price_vec = rx_price_vec.clone(); let mut rx3_price_vec = rx_price_vec.clone();
let mut rx4_price_vec = rx_price_vec.clone(); let mut rx4_price_vec = rx_price_vec.clone();
let mut rx5_price_vec = rx_price_vec.clone(); let mut rx5_price_vec = rx_price_vec.clone();
@ -791,6 +790,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Task#4: price per second // Task#4: price per second
tokio::task::spawn(async move { tokio::task::spawn(async move {
sleep(Duration::from_secs(20)).await;
let client = ClientBuilder::new() let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(1000)) .timeout(tokio::time::Duration::from_millis(1000))
.build() .build()
@ -799,36 +799,126 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut elapsed_time = 0; let mut elapsed_time = 0;
loop { loop {
let instant = Instant::now(); let instant = Instant::now();
// let tx1_changed = rx1_7.changed().await;
// server_epoch = *rx1_7.borrow();
// match tx1_changed {
// Ok(T) => {
// let mut price_vec_temp: Vec<CoinPriceData> = Vec::new();
// let result = request_others::request_all_coin_price(&client, &mut price_vec_temp).await;
// match result {
// Ok(T) => {
// tx_price_vec.send_modify(|vec| *vec = price_vec_temp);
// tx_task4.send(4).expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// // monitors::monitoring_all_coin_profit_change().await;
// }
// Err(e) => {}
// }
let mut price_vec_temp: Vec<CoinPriceData> = Vec::new(); let mut price_vec_temp: Vec<CoinPriceData> = Vec::new();
let result = request_others::request_all_coin_price(&client, &mut price_vec_temp).await; let result = request_others::request_all_coin_price(&client, &mut price_vec_temp).await;
let mut price_vec_temp_c: Vec<CoinPriceData> = Vec::new();
match result { match result {
Ok(T) => { Ok(T) => {
price_vec_temp_c = price_vec_temp.clone();
tx_price_vec.send_modify(|vec| *vec = price_vec_temp); tx_price_vec.send_modify(|vec| *vec = price_vec_temp);
tx_task4.send(4).expect("The mpsc channel has been closed."); tx_task4.send(4).expect("The mpsc channel has been closed.");
} }
Err(E) => {} Err(E) => {}
} }
// Task#10: make realtime price data of 1m, 30m, 1d, 1w and 1mon
let valid_usdt_trade_vec = rx_valid_usdt_trade_vec.borrow().clone();
// 1m
let interval = String::from("1m");
let candle_1m_vec = rx_candle_1m_vec.borrow().clone();
let rt_price_1m_vec_read_temp: Vec<(String, Vec<RealtimePriceData>)> =
Vec::new();
let mut rt_price_1m_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> =
Vec::new();
let result =
value_estimation_team::datapoints::price_data::update_realtime_price_data(
&interval,
&candle_1m_vec,
&rt_price_1m_vec_read_temp,
&mut rt_price_1m_vec_write_temp,
&price_vec_temp_c,
&valid_usdt_trade_vec,
)
.await;
match result {
Ok(T) => {
tx_rt_price_1m_vec.send_modify(|vec| *vec = rt_price_1m_vec_write_temp);
tx_task10
.send(10)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// 30m
let interval = String::from("30m");
let candle_30m_vec = rx_candle_30m_vec.borrow().clone();
let rt_price_1m_vec = rx_rt_price_1m_vec.borrow().clone();
let mut rt_price_30m_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> =
Vec::new();
if !rt_price_1m_vec.is_empty() {
let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_30m_vec, &rt_price_1m_vec, &mut rt_price_30m_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await;
match result {
Ok(T) => {
tx_rt_price_30m_vec
.send_modify(|vec| *vec = rt_price_30m_vec_write_temp);
tx_task10
.send(10)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
}
// 1d
let interval = String::from("1d");
let candle_1d_vec = rx_candle_1d_vec.borrow().clone();
let rt_price_30m_vec = rx_rt_price_30m_vec.borrow().clone();
let mut rt_price_1d_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> =
Vec::new();
if !rt_price_30m_vec.is_empty() {
let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1d_vec, &rt_price_30m_vec, &mut rt_price_1d_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await;
match result {
Ok(T) => {
tx_rt_price_1d_vec
.send_modify(|vec| *vec = rt_price_1d_vec_write_temp);
tx_task10
.send(10)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
}
// { // 1w
// let interval = String::from("1w");
// let candle_1w_vec = rx_candle_1w_vec.borrow().clone();
// let rt_price_1d_vec = rx_rt_price_1d_vec.borrow().clone();
// let mut rt_price_1w_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
// let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1w_vec, &rt_price_1d_vec, &mut rt_price_1w_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await;
// match result {
// Ok(T) => {
// tx_rt_price_1w_vec.send_modify(|vec| *vec = rt_price_1w_vec_write_temp);
// tx_task10.send(10).expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// }
// { // 1mon
// let interval = String::from("1mon");
// let candle_1mon_vec = rx_candle_1mon_vec.borrow().clone();
// let rt_price_1w_vec = rx_rt_price_1w_vec.borrow().clone();
// let mut rt_price_1mon_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
// let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1mon_vec, &rt_price_1w_vec, &mut rt_price_1mon_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await;
// match result {
// Ok(T) => {
// tx_rt_price_1mon_vec.send_modify(|vec| *vec = rt_price_1mon_vec_write_temp);
// tx_task10.send(10).expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// }
// sleep as much as the loop recurs per 1 seconds if all operation finished within 1 seconds. // sleep as much as the loop recurs per 1 seconds if all operation finished within 1 seconds.
elapsed_time = instant.elapsed().as_millis(); elapsed_time = instant.elapsed().as_millis();
if 500 > elapsed_time { if 500 > elapsed_time {
@ -951,134 +1041,134 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// }); // });
// Task#10: make realtime price data of 1m, 30m, 1d, 1w and 1mon // Task#10: make realtime price data of 1m, 30m, 1d, 1w and 1mon
tokio::task::spawn(async move { // tokio::task::spawn(async move {
let mut elapsed_time = 0; // let mut elapsed_time = 0;
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL { // if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
sleep(Duration::from_secs(30)).await; // sleep(Duration::from_secs(30)).await;
} else { // } else {
sleep(Duration::from_secs(5)).await; // sleep(Duration::from_secs(5)).await;
} // }
loop { // loop {
let instant = Instant::now(); // let instant = Instant::now();
let price_changed = rx_price_vec.changed().await; // let price_changed = rx_price_vec.changed().await;
if price_changed.is_ok() { // if price_changed.is_ok() {
let price_vec = rx_price_vec.borrow().clone(); // let price_vec = rx_price_vec.borrow().clone();
let valid_usdt_trade_vec = rx_valid_usdt_trade_vec.borrow().clone(); // let valid_usdt_trade_vec = rx_valid_usdt_trade_vec.borrow().clone();
if !price_vec.is_empty() { // if !price_vec.is_empty() {
// 1m // // 1m
let interval = String::from("1m"); // let interval = String::from("1m");
let candle_1m_vec = rx_candle_1m_vec.borrow().clone(); // let candle_1m_vec = rx_candle_1m_vec.borrow().clone();
let rt_price_1m_vec_read_temp: Vec<(String, Vec<RealtimePriceData>)> = // let rt_price_1m_vec_read_temp: Vec<(String, Vec<RealtimePriceData>)> =
Vec::new(); // Vec::new();
let mut rt_price_1m_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = // let mut rt_price_1m_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> =
Vec::new(); // Vec::new();
let result = // let result =
value_estimation_team::datapoints::price_data::update_realtime_price_data( // value_estimation_team::datapoints::price_data::update_realtime_price_data(
&interval, // &interval,
&candle_1m_vec, // &candle_1m_vec,
&rt_price_1m_vec_read_temp, // &rt_price_1m_vec_read_temp,
&mut rt_price_1m_vec_write_temp, // &mut rt_price_1m_vec_write_temp,
&price_vec, // &price_vec,
&valid_usdt_trade_vec, // &valid_usdt_trade_vec,
) // )
.await; // .await;
match result { // match result {
Ok(T) => { // Ok(T) => {
tx_rt_price_1m_vec.send_modify(|vec| *vec = rt_price_1m_vec_write_temp); // tx_rt_price_1m_vec.send_modify(|vec| *vec = rt_price_1m_vec_write_temp);
tx_task10 // tx_task10
.send(10) // .send(10)
.expect("The mpsc channel has been closed."); // .expect("The mpsc channel has been closed.");
} // }
Err(E) => {} // Err(E) => {}
} // }
// 30m // // 30m
let interval = String::from("30m"); // let interval = String::from("30m");
let candle_30m_vec = rx_candle_30m_vec.borrow().clone(); // let candle_30m_vec = rx_candle_30m_vec.borrow().clone();
let rt_price_1m_vec = rx_rt_price_1m_vec.borrow().clone(); // let rt_price_1m_vec = rx_rt_price_1m_vec.borrow().clone();
let mut rt_price_30m_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = // let mut rt_price_30m_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> =
Vec::new(); // Vec::new();
if !rt_price_1m_vec.is_empty() { // if !rt_price_1m_vec.is_empty() {
let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_30m_vec, &rt_price_1m_vec, &mut rt_price_30m_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await; // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_30m_vec, &rt_price_1m_vec, &mut rt_price_30m_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await;
match result { // match result {
Ok(T) => { // Ok(T) => {
tx_rt_price_30m_vec // tx_rt_price_30m_vec
.send_modify(|vec| *vec = rt_price_30m_vec_write_temp); // .send_modify(|vec| *vec = rt_price_30m_vec_write_temp);
tx_task10 // tx_task10
.send(10) // .send(10)
.expect("The mpsc channel has been closed."); // .expect("The mpsc channel has been closed.");
} // }
Err(E) => {} // Err(E) => {}
} // }
} // }
// 1d // // 1d
let interval = String::from("1d"); // let interval = String::from("1d");
let candle_1d_vec = rx_candle_1d_vec.borrow().clone(); // let candle_1d_vec = rx_candle_1d_vec.borrow().clone();
let rt_price_30m_vec = rx_rt_price_30m_vec.borrow().clone(); // let rt_price_30m_vec = rx_rt_price_30m_vec.borrow().clone();
let mut rt_price_1d_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = // let mut rt_price_1d_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> =
Vec::new(); // Vec::new();
if !rt_price_30m_vec.is_empty() { // if !rt_price_30m_vec.is_empty() {
let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1d_vec, &rt_price_30m_vec, &mut rt_price_1d_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await; // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1d_vec, &rt_price_30m_vec, &mut rt_price_1d_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await;
match result { // match result {
Ok(T) => { // Ok(T) => {
tx_rt_price_1d_vec // tx_rt_price_1d_vec
.send_modify(|vec| *vec = rt_price_1d_vec_write_temp); // .send_modify(|vec| *vec = rt_price_1d_vec_write_temp);
tx_task10 // tx_task10
.send(10) // .send(10)
.expect("The mpsc channel has been closed."); // .expect("The mpsc channel has been closed.");
} // }
Err(E) => {} // Err(E) => {}
} // }
} // }
// { // 1w // // { // 1w
// let interval = String::from("1w"); // // let interval = String::from("1w");
// let candle_1w_vec = rx_candle_1w_vec.borrow().clone(); // // let candle_1w_vec = rx_candle_1w_vec.borrow().clone();
// let rt_price_1d_vec = rx_rt_price_1d_vec.borrow().clone(); // // let rt_price_1d_vec = rx_rt_price_1d_vec.borrow().clone();
// let mut rt_price_1w_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new(); // // let mut rt_price_1w_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
// let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1w_vec, &rt_price_1d_vec, &mut rt_price_1w_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await; // // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1w_vec, &rt_price_1d_vec, &mut rt_price_1w_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await;
// match result { // // match result {
// Ok(T) => { // // Ok(T) => {
// tx_rt_price_1w_vec.send_modify(|vec| *vec = rt_price_1w_vec_write_temp); // // tx_rt_price_1w_vec.send_modify(|vec| *vec = rt_price_1w_vec_write_temp);
// tx_task10.send(10).expect("The mpsc channel has been closed."); // // tx_task10.send(10).expect("The mpsc channel has been closed.");
// } // // }
// Err(E) => {} // // Err(E) => {}
// } // // }
// } // // }
// { // 1mon // // { // 1mon
// let interval = String::from("1mon"); // // let interval = String::from("1mon");
// let candle_1mon_vec = rx_candle_1mon_vec.borrow().clone(); // // let candle_1mon_vec = rx_candle_1mon_vec.borrow().clone();
// let rt_price_1w_vec = rx_rt_price_1w_vec.borrow().clone(); // // let rt_price_1w_vec = rx_rt_price_1w_vec.borrow().clone();
// let mut rt_price_1mon_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new(); // // let mut rt_price_1mon_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
// let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1mon_vec, &rt_price_1w_vec, &mut rt_price_1mon_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await; // // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1mon_vec, &rt_price_1w_vec, &mut rt_price_1mon_vec_write_temp, &price_vec, &valid_usdt_trade_vec).await;
// match result { // // match result {
// Ok(T) => { // // Ok(T) => {
// tx_rt_price_1mon_vec.send_modify(|vec| *vec = rt_price_1mon_vec_write_temp); // // tx_rt_price_1mon_vec.send_modify(|vec| *vec = rt_price_1mon_vec_write_temp);
// tx_task10.send(10).expect("The mpsc channel has been closed."); // // tx_task10.send(10).expect("The mpsc channel has been closed.");
// } // // }
// Err(E) => {} // // Err(E) => {}
// } // // }
// } // // }
} // }
} // }
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second. // // sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis(); // elapsed_time = instant.elapsed().as_millis();
if 1_000 > elapsed_time { // if 250 > elapsed_time {
sleep(Duration::from_millis((1_000 - elapsed_time) as u64)).await; // sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
} // }
} // }
}); // });
// Task#11 // Task#11
// SMA, EMA, RSI, Stoch RSI, BollingerBand (3, 10, 30) for candle 1m, 30m, 1d, 1w, and 1mon // SMA, EMA, RSI, Stoch RSI, BollingerBand (3, 10, 30) for candle 1m, 30m, 1d, 1w, and 1mon
@ -1684,7 +1774,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut elapsed_time = 0; let mut elapsed_time = 0;
loop { loop {
let instant = Instant::now(); let instant = Instant::now();
all_data.price_vec = rx2_price_vec.borrow().clone();
all_data.valid_symbol_vec = rx3_valid_usdt_trade_vec.borrow().clone(); all_data.valid_symbol_vec = rx3_valid_usdt_trade_vec.borrow().clone();
// realtime price data // realtime price data
@ -1776,7 +1865,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.send(18) .send(18)
.expect("The mpsc channel has been closed."); .expect("The mpsc channel has been closed.");
} }
Err(E) => {} Err(E) => {
println!("Task #18 Error");
}
} }
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second. // sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
@ -1793,8 +1884,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut all_data = AllData::new(); let mut all_data = AllData::new();
loop { loop {
let instant = Instant::now(); let instant = Instant::now();
all_data.price_vec = rx2_price_vec.borrow().clone();
all_data.valid_symbol_vec = rx3_valid_usdt_trade_vec.borrow().clone(); all_data.valid_symbol_vec = rx3_valid_usdt_trade_vec.borrow().clone();
// realtime price data // realtime price data
all_data.rt_price_1m_vec = rx3_rt_price_1m_vec.borrow().clone(); all_data.rt_price_1m_vec = rx3_rt_price_1m_vec.borrow().clone();