#![allow(unused)] #![allow(warnings)] use crate::coin_health_check_team::*; use crate::request_candles::CandleData; use crate::request_others::{ExchangeInfo, TradeFee}; use crate::server_health_check_team::ServerHealth; use crate::strategy_team::AllData; use crate::time_checking_team::UserTime; use crate::value_estimation_team::datapoints::price_data::RealtimePriceData; use clap::{arg, Command}; use log::Level; use reqwest::{Client, ClientBuilder}; use rust_decimal::Decimal; use simple_logger::set_up_color_terminal; use sqlx::{mysql::*, Connection, Executor, FromRow, Row}; use tradingbot::future::{FuturesExchangeInfo, FuturesTradeFee}; use std::collections::{HashMap, HashSet}; use std::{ io::{self, Write}, sync::Arc, }; use tokio::{fs::*, join, sync::mpsc, sync::watch, sync::Mutex, task::*, time::*}; use tradingbot::{RunningMode::*, *}; #[tokio::main] async fn main() -> Result<(), Box> { // future last price data let mut future_price_map: HashMap = HashMap::new(); // let (tx_future_price_map, mut rx_future_price_map) = watch::channel(future_price_map); let mut rx2_future_price_map = rx_future_price_map.clone(); let mut futures_exchange_info_map: HashMap = HashMap::new(); let (tx_futures_exchange_info, mut rx_futures_exchange_info) = watch::channel(futures_exchange_info_map); let mut rx2_futures_exchange_info = rx_futures_exchange_info.clone(); let mut rx3_futures_exchange_info = rx_futures_exchange_info.clone(); let mut futures_trade_fee = FuturesTradeFee::new(); let (tx_futures_trade_fee, mut rx_futures_trade_fee) = watch::channel(futures_trade_fee); let mut rx2_futures_trade_fee = rx_futures_trade_fee.clone(); let mut rx3_futures_trade_fee = rx_futures_trade_fee.clone(); // parse argument and set program preference program_setting(); // Datebase initialization initialization::initialization().await; // // let c1 = vec!(("name", "Shima Nabil")); // // println!("{:?}", database_control::update_record(&table_name, &c1, &("grade", "3")).await); // //database_control::exists_table(&table_name).await; // // let table_name = String::from("all24hstatistics"); // // let result = database_control::count_rows(&table_name).await; // // println!("{:?}", result); // // let rows = sqlx::query("select * from serverhealth") // // .map(|r| { // // a.id = r.get::("id"); // // a.waiting_maximum = r.get::("waiting_maximum"); // // a.server_on = r.get::("server_on"); // // a.ping_on = r.get::("ping_on"); // // a.wallet_system_on = r.get::("wallet_system_on"); // // // // }) // // .collect::>() // // .fetch(&pool).await; // // let str_result = rows // // .iter() // // .map(|r| format!("{} {}", r.get::("id"), r.get::("server_on"))) // // .collect::>() // // .join(", "); // // println!("{:?}", rows); // // let (tx_task1, mut rx_task0) = mpsc::unbounded_channel::(); let tx_task2 = tx_task1.clone(); // for Task#2 let tx_task3 = tx_task1.clone(); // for Task#3 let tx_task4 = tx_task1.clone(); // for Task#4 let tx_task5 = tx_task1.clone(); // for Task#5 let tx_task6 = tx_task1.clone(); // for Task#6 let tx_task7 = tx_task1.clone(); // for Task#7 let tx_task8 = tx_task1.clone(); // for Task#8 let tx_task9 = tx_task1.clone(); // for Task#9 let tx_task10 = tx_task1.clone(); // for Task#10 let tx_task11 = tx_task1.clone(); // for Task#11 let tx_task12 = tx_task1.clone(); // for Task#12 let tx_task13 = tx_task1.clone(); // for Task#13 let tx_task14 = tx_task1.clone(); // for Task#14 let tx_task15 = tx_task1.clone(); // for Task#15 let tx_task16 = tx_task1.clone(); // for Task#16 let tx_task17 = tx_task1.clone(); // for Task#17 let tx_task18 = tx_task1.clone(); // for Task#18 let tx_task19 = tx_task1.clone(); // for Task#19 let tx_task20 = tx_task1.clone(); // for Task#20 let tx_task21 = tx_task1.clone(); // for Task#21 let tx_task22 = tx_task1.clone(); // for Task#22 let tx_task23 = tx_task1.clone(); // for Task#23 let tx_task24 = tx_task1.clone(); // for Task#24 let tx_task25 = tx_task1.clone(); // for Task#25 let tx_task26 = tx_task1.clone(); // for Task#26 let tx_task27 = tx_task1.clone(); // for Task#27 let (local_epoch_tx1, mut local_epoch_rx1) = watch::channel(0); // local_epoch let (epoch_difference_tx1, mut epoch_difference_rx1) = watch::channel(0); // epoch_difference let (local_epoch_tx2, mut local_epoch_rx2) = watch::channel(0); // local_epoch let (epoch_difference_tx2, mut epoch_difference_rx2) = watch::channel(0); // epoch_difference // trade fee data let mut tradefee_map: HashMap = HashMap::new(); // let (tx_tradefee_map, mut rx_tradefee_map) = watch::channel(tradefee_map); let mut rx2_tradefee_map = rx_tradefee_map.clone(); let mut rx3_tradefee_map = rx_tradefee_map.clone(); let mut rx4_tradefee_map = rx_tradefee_map.clone(); let mut rx5_tradefee_map = rx_tradefee_map.clone(); let mut tradefee_map_capacity = rx_tradefee_map.clone(); // valid usdt trade data let mut valid_usdt_trade_set: HashSet = HashSet::new(); // symbol let (tx_valid_usdt_trade_set, mut rx_valid_usdt_trade_set) = watch::channel(valid_usdt_trade_set); let mut rx2_valid_usdt_trade_set = rx_valid_usdt_trade_set.clone(); let mut rx3_valid_usdt_trade_set = rx_valid_usdt_trade_set.clone(); let mut rx4_valid_usdt_trade_set = rx_valid_usdt_trade_set.clone(); let mut valid_usdt_trade_set_capacity = rx_valid_usdt_trade_set.clone(); // price per second data and channels let mut price_map: HashMap = HashMap::new(); // let (tx_price_map, mut rx_price_map) = watch::channel(price_map); let mut rx3_price_map = rx_price_map.clone(); let mut rx5_price_map = rx_price_map.clone(); let mut price_map_capacity = rx_price_map.clone(); // candle data from endpoint and channels let mut candle_1m_map: HashMap> = HashMap::new(); // > let (tx_candle_1m_map, mut rx_candle_1m_map) = watch::channel(candle_1m_map); let mut candle_1m_map_capacity = rx_candle_1m_map.clone(); let mut candle_30m_map: HashMap> = HashMap::new(); let (tx_candle_30m_map, mut rx_candle_30m_map) = watch::channel(candle_30m_map); let mut candle_30m_map_capacity = rx_candle_30m_map.clone(); let mut candle_1d_map: HashMap> = HashMap::new(); let (tx_candle_1d_map, mut rx_candle_1d_map) = watch::channel(candle_1d_map); let mut candle_1d_map_capacity = rx_candle_1d_map.clone(); let mut candle_1w_map: HashMap> = HashMap::new(); let (tx_candle_1w_map, mut rx_candle_1w_map) = watch::channel(candle_1w_map); let mut candle_1w_map_capacity = rx_candle_1w_map.clone(); let mut candle_1mon_map: HashMap> = HashMap::new(); let (tx_candle_1mon_map, mut rx_candle_1mon_map) = watch::channel(candle_1mon_map); let mut candle_1mon_map_capacity = rx_candle_1mon_map.clone(); // real-time reflected price data and channels let mut rt_price_1m_map: HashMap> = HashMap::new(); // > let (tx_rt_price_1m_map, mut rx_rt_price_1m_map) = watch::channel(rt_price_1m_map); let mut rx2_rt_price_1m_map = rx_rt_price_1m_map.clone(); let mut rx3_rt_price_1m_map = rx_rt_price_1m_map.clone(); let mut rx4_rt_price_1m_map = rx_rt_price_1m_map.clone(); let mut rx5_rt_price_1m_map = rx_rt_price_1m_map.clone(); let mut rt_price_1m_map_capacity = rx_rt_price_1m_map.clone(); let mut rt_price_30m_map: HashMap> = HashMap::new(); let (tx_rt_price_30m_map, mut rx_rt_price_30m_map) = watch::channel(rt_price_30m_map); let mut rx2_rt_price_30m_map = rx_rt_price_30m_map.clone(); let mut rx3_rt_price_30m_map = rx_rt_price_30m_map.clone(); let mut rx4_rt_price_30m_map = rx_rt_price_30m_map.clone(); let mut rx5_rt_price_30m_map = rx_rt_price_30m_map.clone(); let mut rt_price_30m_map_capacity = rx_rt_price_30m_map.clone(); let mut rt_price_1d_map: HashMap> = HashMap::new(); let (tx_rt_price_1d_map, mut rx_rt_price_1d_map) = watch::channel(rt_price_1d_map); let mut rx2_rt_price_1d_map = rx_rt_price_1d_map.clone(); let mut rx3_rt_price_1d_map = rx_rt_price_1d_map.clone(); let mut rx4_rt_price_1d_map = rx_rt_price_1d_map.clone(); let mut rt_price_1d_map_capacity = rx_rt_price_1d_map.clone(); let mut rt_price_1w_map: HashMap> = HashMap::new(); let (tx_rt_price_1w_map, mut rx_rt_price_1w_map) = watch::channel(rt_price_1w_map); let mut rx2_rt_price_1w_map = rx_rt_price_1w_map.clone(); let mut rx3_rt_price_1w_map = rx_rt_price_1w_map.clone(); let mut rx4_rt_price_1w_map = rx_rt_price_1w_map.clone(); let mut rt_price_1w_map_capacity = rx_rt_price_1w_map.clone(); let mut rt_price_1mon_map: HashMap> = HashMap::new(); let (tx_rt_price_1mon_map, mut rx_rt_price_1mon_map) = watch::channel(rt_price_1mon_map); let mut rx2_rt_price_1mon_map = rx_rt_price_1mon_map.clone(); let mut rx3_rt_price_1mon_map = rx_rt_price_1mon_map.clone(); let mut rx4_rt_price_1mon_map = rx_rt_price_1mon_map.clone(); let mut rt_price_1mon_map_capacity = rx_rt_price_1mon_map.clone(); // TEMA data // let mut tema3_1m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // Vec<(symbol, Vec<(price, closetime)>)> // let (tx_tema3_1m_data, mut rx_tema3_1m_data) = watch::channel(tema3_1m_data); // let mut tema3_30m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // let (tx_tema3_30m_data, mut rx_tema3_30m_data) = watch::channel(tema3_30m_data); // let mut tema3_1d_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // let (tx_tema3_1d_data, mut rx_tema3_1d_data) = watch::channel(tema3_1d_data); // let mut tema3_1w_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // let (tx_tema3_1w_data, mut rx_tema3_1w_data) = watch::channel(tema3_1w_data); // let mut tema3_1mon_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // let (tx_tema3_1mon_data, mut rx_tema3_1mon_data) = watch::channel(tema3_1mon_data); // let mut tema10_1m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // Vec<(symbol, Vec<(price, closetime)>)> // let (tx_tema10_1m_data, mut rx_tema10_1m_data) = watch::channel(tema10_1m_data); // let mut tema10_30m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // let (tx_tema10_30m_data, mut rx_tema10_30m_data) = watch::channel(tema10_30m_data); // let mut tema10_1d_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // let (tx_tema10_1d_data, mut rx_tema10_1d_data) = watch::channel(tema10_1d_data); // let mut tema10_1w_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // let (tx_tema10_1w_data, mut rx_tema10_1w_data) = watch::channel(tema10_1w_data); // let mut tema10_1mon_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // let (tx_tema10_1mon_data, mut rx_tema10_1mon_data) = watch::channel(tema10_1mon_data); // let mut tema30_1m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // Vec<(symbol, Vec<(price, closetime)>)> // let (tx_tema30_1m_data, mut rx_tema30_1m_data) = watch::channel(tema30_1m_data); // let mut tema30_30m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // let (tx_tema30_30m_data, mut rx_tema30_30m_data) = watch::channel(tema30_30m_data); // let mut tema30_1d_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // let (tx_tema30_1d_data, mut rx_tema30_1d_data) = watch::channel(tema30_1d_data); // let mut tema30_1w_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // let (tx_tema30_1w_data, mut rx_tema30_1w_data) = watch::channel(tema30_1w_data); // let mut tema30_1mon_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // let (tx_tema30_1mon_data, mut rx_tema30_1mon_data) = watch::channel(tema30_1mon_data); // Exchange Information data let mut exchange_info_map: HashMap = HashMap::new(); let (tx_exchange_info_map, mut rx_exchange_info_map) = watch::channel(exchange_info_map); let mut rx2_exchange_info_map = rx_exchange_info_map.clone(); let mut rx3_exchange_info_map = rx_exchange_info_map.clone(); let mut rx4_exchange_info_map = rx_exchange_info_map.clone(); let mut rx5_exchange_info_map = rx_exchange_info_map.clone(); let mut rx6_exchange_info_map = rx_exchange_info_map.clone(); let mut exchange_info_map_capacity = rx_exchange_info_map.clone(); { // pre-fetching candle data: 30m, 1d, 1w and 1mon print!("pre-fetching candle data: 30m, 1d, 1w and 1mon.."); std::io::stdout().flush(); let instant = Instant::now(); let interval_30m = String::from("30m"); let interval_1d = String::from("1d"); let interval_1w = String::from("1w"); let interval_1mon = String::from("1mon"); let mut candle_30m_map_temp: HashMap> = HashMap::new(); let mut candle_1d_map_temp: HashMap> = HashMap::new(); let mut candle_1w_map_temp: HashMap> = HashMap::new(); let mut candle_1mon_map_temp: HashMap> = HashMap::new(); request_candles::fetch_candle_parallel(&interval_30m, &mut candle_30m_map_temp).await; request_candles::fetch_candle_parallel(&interval_1d, &mut candle_1d_map_temp).await; request_candles::fetch_candle_parallel(&interval_1w, &mut candle_1w_map_temp).await; request_candles::fetch_candle_parallel(&interval_1mon, &mut candle_1mon_map_temp).await; tx_candle_30m_map.send_modify(|map| *map = candle_30m_map_temp); tx_candle_1d_map.send_modify(|map| *map = candle_1d_map_temp); tx_candle_1w_map.send_modify(|map| *map = candle_1w_map_temp); tx_candle_1mon_map.send_modify(|map| *map = candle_1mon_map_temp); // sleep as much as the loop recurs per 60 seconds, expecting child threads will have finished within 60 seconds. println!("Ok.."); unsafe { if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL { let mut elapsed_time = instant.elapsed().as_secs(); let mut remaining_time: u64 = 60 - elapsed_time; while remaining_time > 0 && 60 > remaining_time { print!("\rstart tradingbot in {:2} seconds", remaining_time); io::stdout().flush(); elapsed_time = instant.elapsed().as_secs(); loop { let temp_elapsed_time = instant.elapsed().as_secs(); let is_overflowed = remaining_time.overflowing_sub(1).1; if is_overflowed == true { break; } if elapsed_time < temp_elapsed_time { remaining_time -= 1; break; } } } println!("\nTradingbot is running"); } } } // Task#0: monitoring tasks tokio::task::spawn(async move { loop { let result = rx_task0.recv().await; match result { Some(_) => { print!("\r{:2}", result.unwrap()); io::stdout().flush(); } None => {} } sleep(Duration::from_millis(50)).await; } }); // Task#1: check server time and health tokio::task::spawn(async move { let mut prev_server_epoch: u128 = 0; let mut epoch_difference: f64 = 0.0; let mut epoch_mean: f64 = 0.0; let mut elapsed_time: u128 = 0; loop { let instant = Instant::now(); let client = ClientBuilder::new() .timeout(Duration::from_millis(1000)) .build() .unwrap(); let mut usertime = UserTime::new(); let mut serverhealth = ServerHealth::new(); let mut epoch_difference_vec: Vec = Vec::new(); // let mut usertime_mutex = usertime_arc.lock().await; // server_health_check_team::execute_server_health_check(&mut usertime_mutex).await; // time_checking_team::execute_time_check(&mut usertime_mutex).await; let result = server_health_check_team::execute_server_health_check( &mut usertime, &mut serverhealth, &client, ) .await; while let Err(E) = time_checking_team::execute_time_check(&mut usertime, &client).await { sleep(Duration::from_millis((100))).await; while let Err(e) = server_health_check_team::execute_server_health_check( &mut usertime, &mut serverhealth, &client, ) .await {} } match result { Ok(T) => { local_epoch_tx1 .send(usertime.local_epoch) .expect("local_epoch_tx1-local_epoch_rx1 channel has been closed."); epoch_difference_tx1.send(usertime.epoch_difference).expect( "epoch_difference_tx1-epoch_difference_rx1 channel has been closed.", ); local_epoch_tx2 .send(usertime.local_epoch) .expect("local_epoch_tx2-local_epoch_rx2 channel has been closed."); epoch_difference_tx2.send(usertime.epoch_difference).expect( "epoch_difference_tx2-epoch_difference_rx2 channel has been closed.", ); tx_task1.send(1).expect("The mpsc channel has been closed."); } Err(E) => {} } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. // considering RTT and to make the wait time 1 second, latency each second for every 30 seconds is averaged and subtracted with wait time. elapsed_time = instant.elapsed().as_millis(); if 1_000 > elapsed_time { if prev_server_epoch != 0 { epoch_mean = 0.0; epoch_difference = (usertime.server_epoch as f64) - (prev_server_epoch as f64) - 1000.0; // milli second if epoch_difference > 300.0 || epoch_difference < -300.0 { epoch_difference_vec.clear(); epoch_difference = 0.0; } if epoch_difference_vec.len() == 10 { epoch_difference_vec.rotate_left(1); epoch_difference_vec.pop(); epoch_difference_vec.push(epoch_difference); } else if epoch_difference_vec.len() < 10 { epoch_difference_vec.push(epoch_difference); } for element in &epoch_difference_vec { epoch_mean += *element; } epoch_mean = epoch_mean / (epoch_difference_vec.len() as f64); } if epoch_mean > 0.0 { if elapsed_time > ((epoch_mean) as u128) && 1_000 > elapsed_time + ((epoch_mean) as u128) { sleep(Duration::from_millis( (1_000 - elapsed_time - ((epoch_mean) as u128)) as u64, )) .await; } else { sleep(Duration::from_millis((1_000 - elapsed_time) as u64)).await; } } else { sleep(Duration::from_millis( (1_000 - elapsed_time + ((epoch_mean.abs()) as u128)) as u64, )) .await; } } prev_server_epoch = usertime.server_epoch; } }); // Task#2: request trade fee tokio::task::spawn(async move { let mut elapsed_time = 0; loop { let instant = Instant::now(); let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(3000)) .build() .unwrap(); let tx1_changed = local_epoch_rx1.changed().await; let tx2_changed = epoch_difference_rx1.changed().await; let local_epoch = *local_epoch_rx1.borrow(); let difference_epoch = *epoch_difference_rx1.borrow(); match tx1_changed { Ok(T) => match tx2_changed { Ok(T) => { let mut tradefee_vec_temp: HashMap = HashMap::new(); loop { let result = request_others::request_trade_fee( API_KEY, SECRET_KEY, local_epoch, difference_epoch, &client, &mut tradefee_vec_temp, ) .await; if tradefee_vec_temp.len() == 0 { sleep(Duration::from_secs(3)).await; } else { break; } } tx_tradefee_map.send_modify(|vec| *vec = tradefee_vec_temp); tx_task2.send(2).expect("The mpsc channel has been closed."); } Err(E) => { panic!("tx2-rx2 channel has been closed.") } }, Err(E) => { panic!("tx1-rx1 channel has been closed.") } } // sleep as much as the loop recurs per 60 seconds if all operation finished within 60 seconds. elapsed_time = instant.elapsed().as_millis(); if 60_000 > elapsed_time { sleep(Duration::from_millis((60_000 - elapsed_time) as u64)).await; } } }); // Task#3: request lot stepsize and ticksize tokio::task::spawn(async move { loop { let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(3000)) .build() .unwrap(); let mut exchange_info_map_temp: HashMap = HashMap::new(); let mut result; loop { result = coin_health_check_team::request_others::request_exchange_infomation( &client, &mut exchange_info_map_temp, ) .await; // retry if exchange_info_map_temp.len() == 0 { sleep(Duration::from_secs(3)).await; } else { break; } } tx_exchange_info_map.send_modify(|vec| *vec = exchange_info_map_temp); tx_task3.send(3).expect("The mpsc channel has been closed."); sleep(Duration::from_secs(300)).await; // sleep for 5 mins } }); // Task#4: request 24h price changes, // pick valid USDT Trades, // filtering stop USDT Trades, // monitor total_24h_change_profit_index, // usdt_24h_change_profit_index, // total_price_down_dist_index tokio::task::spawn(async move { sleep(Duration::from_secs(10)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(3000)) .build() .unwrap(); let result = request_others::request_24hr_ticker_price_change_statistics(&client).await; match result { Ok(T) => { let exchange_info_map = rx5_exchange_info_map.borrow().clone(); let mut valid_usdt_trade_set_temp: HashSet = HashSet::new(); monitors::collect_valid_usde_trade( &mut valid_usdt_trade_set_temp, &exchange_info_map, ) .await; tx_valid_usdt_trade_set.send_modify(|vec| *vec = valid_usdt_trade_set_temp); tx_task4.send(4).expect("The mpsc channel has been closed."); } Err(E) => { log::warn!(">>> Failed to monitor usdt_24h_change_profit_index."); } } // match result { // Ok(T) => { // let result = monitors::total_24h_change_profit_index().await; // match result { // Ok(T) => {}, // Err(E) => { // log::warn!(">>> Failed to monitor total_24h_change_profit_index."); // } // }; // let result = monitors::usdt_24h_change_profit_index().await; // match result { // Ok(T) => {}, // Err(E) => { // log::warn!(">>> Failed to monitor usdt_24h_change_profit_index."); // } // }; // let result = monitors::total_price_down_dist_index().await; // match result { // Ok(T) => {}, // Err(E) => { // log::warn!(">>> Failed to monitor total_price_down_dist_index."); // } // }; // }, // Err(E) => { // log::warn!(">>> Failed to fetch 24h price change data from endpoint or parse message."); // } // }; // sleep as much as the loop recurs per 30 seconds if all operation finished within 30 seconds. elapsed_time = instant.elapsed().as_millis(); if 30_000 > elapsed_time { sleep(Duration::from_millis((30_000 - elapsed_time) as u64)).await; } } }); // Task#5: price per second tokio::task::spawn(async move { sleep(Duration::from_secs(20)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(1000)) .build() .unwrap(); let mut price_vec_temp: HashMap = HashMap::new(); let mut price_vec_temp_c: HashMap = HashMap::new(); request_others::request_all_coin_price(&client, &mut price_vec_temp).await; price_vec_temp_c = price_vec_temp.clone(); tx_price_map.send_modify(|vec| *vec = price_vec_temp); tx_task5.send(5).expect("The mpsc channel has been closed."); // Task#10: make realtime price data of 1m, 30m, 1d, 1w and 1mon let valid_usdt_trade_set = rx_valid_usdt_trade_set.borrow().clone(); // 1m let interval = String::from("1m"); let candle_1m_vec = rx_candle_1m_map.borrow().clone(); let dummy_data: HashMap> = HashMap::new(); let mut rt_price_1m_map_write_temp: HashMap> = HashMap::new(); let mut rt_price_1m_map_write_temp_c: HashMap> = HashMap::new(); let result = value_estimation_team::datapoints::price_data::update_realtime_price_data( &interval, &candle_1m_vec, &dummy_data, &mut rt_price_1m_map_write_temp, &price_vec_temp_c, &valid_usdt_trade_set, ) .await; rt_price_1m_map_write_temp_c = rt_price_1m_map_write_temp.clone(); if tx_rt_price_1m_map.is_closed() { log::error!("tx_rt_price_1m_vec has been closed!"); } else { tx_rt_price_1m_map.send_modify(|vec| *vec = rt_price_1m_map_write_temp); } // 30m let interval = String::from("30m"); let candle_30m_map = rx_candle_30m_map.borrow().clone(); let mut rt_price_30m_map_write_temp: HashMap> = HashMap::new(); let mut rt_price_30m_map_write_temp_c: HashMap> = HashMap::new(); if !rt_price_1m_map_write_temp_c.is_empty() { let result = value_estimation_team::datapoints::price_data::update_realtime_price_data( &interval, &candle_30m_map, &rt_price_1m_map_write_temp_c, &mut rt_price_30m_map_write_temp, &price_vec_temp_c, &valid_usdt_trade_set, ) .await; rt_price_30m_map_write_temp_c = rt_price_30m_map_write_temp.clone(); if tx_rt_price_30m_map.is_closed() { log::error!("tx_rt_price_30m_vec has been closed!"); } else { tx_rt_price_30m_map.send_modify( |map: &mut HashMap>| { *map = rt_price_30m_map_write_temp }, ); } } // 1d let interval = String::from("1d"); let candle_1d_vec = rx_candle_1d_map.borrow().clone(); let mut rt_price_1d_map_write_temp: HashMap> = HashMap::new(); if !rt_price_30m_map_write_temp_c.is_empty() { let result = value_estimation_team::datapoints::price_data::update_realtime_price_data( &interval, &candle_1d_vec, &rt_price_30m_map_write_temp_c, &mut rt_price_1d_map_write_temp, &price_vec_temp_c, &valid_usdt_trade_set, ) .await; match result { Ok(T) => { if tx_rt_price_1d_map.is_closed() { log::error!("tx_rt_price_1d_vec has been closed!"); } else { tx_rt_price_1d_map.send_modify(|map| *map = rt_price_1d_map_write_temp); } } Err(E) => {} } } // { // 1w // let interval = String::from("1w"); // let candle_1w_vec = rx_candle_1w_vec.borrow().clone(); // let rt_price_1d_vec = rx_rt_price_1d_vec.borrow().clone(); // let mut rt_price_1w_vec_write_temp: Vec<(String, Vec)> = Vec::new(); // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1w_vec, &rt_price_1d_vec, &mut rt_price_1w_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await; // match result { // Ok(T) => { // tx_rt_price_1w_vec.send_modify(|vec| *vec = rt_price_1w_vec_write_temp); // tx_task10.send(10).expect("The mpsc channel has been closed."); // } // Err(E) => {} // } // } // { // 1mon // let interval = String::from("1mon"); // let candle_1mon_vec = rx_candle_1mon_vec.borrow().clone(); // let rt_price_1w_vec = rx_rt_price_1w_vec.borrow().clone(); // let mut rt_price_1mon_vec_write_temp: Vec<(String, Vec)> = Vec::new(); // let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1mon_vec, &rt_price_1w_vec, &mut rt_price_1mon_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await; // match result { // Ok(T) => { // tx_rt_price_1mon_vec.send_modify(|vec| *vec = rt_price_1mon_vec_write_temp); // tx_task10.send(10).expect("The mpsc channel has been closed."); // } // Err(E) => {} // } // } // sleep as much as the loop recurs per 1 seconds if all operation finished within 1 seconds. elapsed_time = instant.elapsed().as_millis(); if 500 > elapsed_time { sleep(Duration::from_millis((500 - elapsed_time) as u64)).await; } } }); // Task#6: fetch candle 1m tokio::task::spawn(async move { let mut elapsed_time = 0; let interval = String::from("1m"); loop { let instant = Instant::now(); let mut candle_1m_map_temp: HashMap> = HashMap::new(); let result = request_candles::fetch_candle_parallel(&interval, &mut candle_1m_map_temp).await; match result { Ok(T) => { tx_candle_1m_map.send_modify(|vec| *vec = candle_1m_map_temp); tx_task6.send(6).expect("The mpsc channel has been closed."); } Err(E) => {} } // sleep as much as the loop recurs per 10 seconds, expecting child threads will have finished within 10 seconds. elapsed_time = instant.elapsed().as_millis(); if 30_000 > elapsed_time { // 10_000 for major trade sleep(Duration::from_millis((30_000 - elapsed_time) as u64)).await; } } }); // Task#7: fetch candle 30m tokio::task::spawn(async move { let mut elapsed_time = 0; let interval = String::from("30m"); loop { let instant = Instant::now(); let mut candle_30m_map_temp: HashMap> = HashMap::new(); let result = request_candles::fetch_candle_delay(&interval, &mut candle_30m_map_temp).await; // request_candles::fetch_candle_parallel(&interval, &mut candle_30m_vec_temp).await; match result { Ok(T) => { tx_candle_30m_map.send_modify(|vec| *vec = candle_30m_map_temp); tx_task7.send(7).expect("The mpsc channel has been closed."); } Err(E) => {} } // sleep as much as the loop recurs per 60 seconds, expecting child threads will have finished within 60 seconds. elapsed_time = instant.elapsed().as_millis(); if 60_000 > elapsed_time { //60_000 sleep(Duration::from_millis((60_000 - elapsed_time) as u64)).await; } } }); // Task#8: fetch candle 1d tokio::task::spawn(async move { let mut elapsed_time = 0; let interval = String::from("1d"); sleep(Duration::from_secs(600)).await; loop { let instant = Instant::now(); let mut candle_1d_map_temp: HashMap> = HashMap::new(); let result = request_candles::fetch_candle_delay(&interval, &mut candle_1d_map_temp).await; match result { Ok(T) => { tx_candle_1d_map.send_modify(|vec| *vec = candle_1d_map_temp); tx_task8.send(8).expect("The mpsc channel has been closed."); } Err(E) => {} } // sleep as much as the loop recurs per 1800 seconds, expecting child threads will have finished within 1800 seconds. elapsed_time = instant.elapsed().as_secs(); if 1_800 > elapsed_time { sleep(Duration::from_secs((1_800 - elapsed_time) as u64)).await; } } }); // // Task#9: fetch candle 1w // tokio::task::spawn(async move{ // let interval = String::from("1w"); // sleep(Duration::from_secs(600)).await; // loop{ // let mut candle_1w_vec_temp: Vec<(String, Vec)> = Vec::new(); // let result = request_candles::fetch_candle_delay(&interval, &mut candle_1w_vec_temp).await; // match result { // Ok(T) => { // tx_candle_1w_vec.send_modify(|vec| *vec = candle_1w_vec_temp); // tx_task9.send(9).expect("The mpsc channel has been closed."); // } // Err(E) => {} // } // } // }); // // Task#10: fetch candle 1mon // tokio::task::spawn(async move{ // let interval = String::from("1mon"); // sleep(Duration::from_secs(600)).await; // loop{ // let mut candle_1mon_vec_temp: Vec<(String, Vec)> = Vec::new(); // let result = request_candles::fetch_candle_delay(&interval, &mut candle_1mon_vec_temp).await; // match result { // Ok(T) => { // tx_candle_1mon_vec.send_modify(|vec| *vec = candle_1mon_vec_temp); // tx_task10.send(10).expect("The mpsc channel has been closed."); // } // Err(E) => {} // } // } // }); // Task#11: monitoring total market cap // if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST { // tokio::task::spawn(async move { // loop { // let result = signal_association::coinmarketcap::market_cap_index().await; // match result { // Ok(T) => { // tx_task11 // .send(11) // .expect("The mpsc channel has been closed."); // } // Err(E) => {} // } // } // }); // } // Task#12: monitoring foreign exchange rate // if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL { // tokio::task::spawn(async move { // loop { // let result = signal_association::exchange_rate::monitoring_fx_rate_index().await; // match result { // Ok(T) => { // tx_task12 // .send(12) // .expect("The mpsc channel has been closed."); // } // Err(E) => {} // } // sleep(Duration::from_secs(2700)).await; // sleep for 45mins // } // }); // } // // // Task#13: monitoring dollar index // // tokio::task::spawn(async move { // // loop // // { // // let result = signal_association::dollar_index::monitoring_dollar_index().await; // // match result { // // Ok(T) => { // // tx_task13.send(13).expect("The mpsc channel has been closed."); // // } // // Err(E) => {} // // } // // sleep(Duration::from_secs(1800)).await; // sleep for 30mins // // } // // }); // Task#14: monitoring signal decision // if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST { // tokio::task::spawn(async move { // loop { // let result = // signal_association::signal_decision::monitoring_signal_decision().await; // match result { // Ok(T) => { // tx_task14 // .send(14) // .expect("The mpsc channel has been closed."); // } // Err(E) => {} // } // sleep(Duration::from_millis(500)).await; // sleep for 0.5sec // } // }); // } // Task#15: monitoring future ratio // if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST { // tokio::task::spawn(async move { // loop { // let result = signal_association::future_ratio::monitoring_future_ratio().await; // match result { // Ok(T) => { // tx_task15 // .send(15) // .expect("The mpsc channel has been closed."); // } // Err(E) => {} // } // sleep(Duration::from_secs(60)).await; // sleep for 1min // } // }); // } // COEX part // Task#16: execute strategis for buy unsafe { if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL { tokio::task::spawn(async move { sleep(Duration::from_secs(40)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let mut all_data = AllData::new(); all_data.valid_symbol_vec = rx3_valid_usdt_trade_set.borrow().clone(); // realtime price data all_data.rt_price_1m_vec = rx3_rt_price_1m_map.borrow().clone(); all_data.rt_price_30m_vec = rx3_rt_price_30m_map.borrow().clone(); all_data.rt_price_1d_vec = rx3_rt_price_1d_map.borrow().clone(); all_data.rt_price_1w_vec = rx3_rt_price_1w_map.borrow().clone(); all_data.rt_price_1mon_vec = rx3_rt_price_1mon_map.borrow().clone(); // future exchange info let futures_exchange_info = rx3_futures_exchange_info.borrow().clone(); let result = strategy_team::strategy_manager::execute_list_up_for_buy(&all_data, &futures_exchange_info).await; match result { Ok(T) => { tx_task16 .send(16) .expect("The mpsc channel has been closed."); } Err(E) => { // log::error!("Couldn't execute strategists."); } } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. elapsed_time = instant.elapsed().as_millis(); if 250 > elapsed_time { sleep(Duration::from_millis((250 - elapsed_time) as u64)).await; } } }); } else { tokio::task::spawn(async move { sleep(Duration::from_secs(10)).await; let mut elapsed_time = 0; let mut all_data = AllData::new(); loop { let instant = Instant::now(); all_data.valid_symbol_vec = rx3_valid_usdt_trade_set.borrow().clone(); // realtime price data all_data.rt_price_1m_vec = rx3_rt_price_1m_map.borrow().clone(); all_data.rt_price_30m_vec = rx3_rt_price_30m_map.borrow().clone(); all_data.rt_price_1d_vec = rx3_rt_price_1d_map.borrow().clone(); all_data.rt_price_1w_vec = rx3_rt_price_1w_map.borrow().clone(); all_data.rt_price_1mon_vec = rx3_rt_price_1mon_map.borrow().clone(); // let result = coex::strategy_team::execute_strategist_for_test(&all_data).await; // match result { // Ok(T) => { // tx_task16 // .send(16) // .expect("The mpsc channel has been closed."); // } // Err(E) => {} // } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. elapsed_time = instant.elapsed().as_millis(); if 250 > elapsed_time { sleep(Duration::from_millis((250 - elapsed_time) as u64)).await; } } }); } } // Task#17: execute strategis for sell unsafe { if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL { tokio::task::spawn(async move { sleep(Duration::from_secs(40)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let mut all_data = AllData::new(); let mut exchange_info_map: HashMap = HashMap::new(); let mut trade_fee_map: HashMap = HashMap::new(); all_data.valid_symbol_vec = rx4_valid_usdt_trade_set.borrow().clone(); exchange_info_map = rx6_exchange_info_map.borrow().clone(); trade_fee_map = rx5_tradefee_map.borrow().clone(); // realtime price data all_data.rt_price_1m_vec = rx5_rt_price_1m_map.borrow().clone(); all_data.rt_price_30m_vec = rx5_rt_price_30m_map.borrow().clone(); all_data.rt_price_1d_vec = rx4_rt_price_1d_map.borrow().clone(); all_data.rt_price_1w_vec = rx4_rt_price_1w_map.borrow().clone(); all_data.rt_price_1mon_vec = rx4_rt_price_1mon_map.borrow().clone(); let result = strategy_team::strategy_manager::execute_list_up_for_sell( &all_data, &exchange_info_map, &trade_fee_map, ) .await; match result { Ok(T) => { tx_task17 .send(17) .expect("The mpsc channel has been closed."); } Err(E) => { // log::error!("Couldn't execute strategists."); } } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. elapsed_time = instant.elapsed().as_millis(); if 250 > elapsed_time { sleep(Duration::from_millis((250 - elapsed_time) as u64)).await; } } }); } } // Task#18: monitoring pre-suggested coins tokio::task::spawn(async move { sleep(Duration::from_secs(30)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let coin_price_vec = rx3_price_map.borrow().clone(); let result = coex::exchange_team::monitoring_pre_suggested_coins(&coin_price_vec).await; // send Task#0 a message to notify running on match result { Ok(T) => { tx_task18 .send(18) .expect("The mpsc channel has been closed."); } Err(E) => {} } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. elapsed_time = instant.elapsed().as_millis(); if 100 > elapsed_time { sleep(Duration::from_millis((100 - elapsed_time) as u64)).await; } } }); // Task#19: buy_coin tokio::task::spawn(async move { sleep(Duration::from_secs(30)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let exchange_info_map = rx_exchange_info_map.borrow().clone(); let trade_fee_map = rx_tradefee_map.borrow().clone(); let result = coex::exchange_team::buy_coin(&exchange_info_map, &trade_fee_map).await; // send Task#0 a message to notify running on match result { Ok(T) => { tx_task19 .send(19) .expect("The mpsc channel has been closed."); } Err(E) => { eprint!("Error: {:?}", E); } } // sleep as much as the loop recurs per 200ms second if all operation finished within 200ms elapsed_time = instant.elapsed().as_millis(); if 200 > elapsed_time { sleep(Duration::from_millis((200 - elapsed_time) as u64)).await; } } }); // Task#20: monitoring_open_buy_order unsafe { if RUNNING_MODE == REAL || RUNNING_MODE == TEST { tokio::task::spawn(async move { sleep(Duration::from_secs(30)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(3000)) .build() .unwrap(); let trade_fee_map = rx2_tradefee_map.borrow().clone(); let result = coex::order_team::monitoring_open_buy_order(&client, &trade_fee_map).await; // send Task#0 a message to notify running on match result { Ok(T) => { tx_task20 .send(20) .expect("The mpsc channel has been closed."); } Err(E) => {} } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. elapsed_time = instant.elapsed().as_millis(); if 200 > elapsed_time { sleep(Duration::from_millis((200 - elapsed_time) as u64)).await; } } }); } } // Task#21: update_price_of_filled_buy_order tokio::task::spawn(async move { sleep(Duration::from_secs(30)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let coin_price_vec = rx5_price_map.borrow().clone(); let exchange_info_vec = rx2_exchange_info_map.borrow().clone(); let trade_fee_vec = rx3_tradefee_map.borrow().clone(); let result = coex::order_team::update_price_of_filled_buy_order( &coin_price_vec, &exchange_info_vec, &trade_fee_vec, ) .await; // send Task#0 a message to notify running on match result { Ok(T) => { tx_task21 .send(21) .expect("The mpsc channel has been closed."); } Err(E) => {} } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. elapsed_time = instant.elapsed().as_millis(); if 250 > elapsed_time { sleep(Duration::from_millis((250 - elapsed_time) as u64)).await; } } }); // Task#22: monitoring_open_sell_order unsafe { if RUNNING_MODE == REAL || RUNNING_MODE == TEST { tokio::task::spawn(async move { sleep(Duration::from_secs(30)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(3000)) .build() .unwrap(); let exchange_info_vec = rx4_exchange_info_map.borrow().clone(); let trade_fee_vec = rx4_tradefee_map.borrow().clone(); let result = coex::order_team::monitoring_open_sell_order( &client, &exchange_info_vec, &trade_fee_vec, ) .await; // send Task#0 a message to notify running on match result { Ok(T) => { tx_task22 .send(22) .expect("The mpsc channel has been closed."); } Err(E) => {} } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. elapsed_time = instant.elapsed().as_millis(); if 250 > elapsed_time { sleep(Duration::from_millis((250 - elapsed_time) as u64)).await; } } }); } } // Task#23: monitoring_filled_sell_order tokio::task::spawn(async move { let mut elapsed_time = 0; loop { let instant = Instant::now(); let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(3000)) .build() .unwrap(); let result = coex::order_team::monitoring_filled_sell_order(&client).await; // send Task#0 a message to notify running on match result { Ok(T) => { tx_task23 .send(23) .expect("The mpsc channel has been closed."); } Err(E) => {} } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. elapsed_time = instant.elapsed().as_millis(); if 200 > elapsed_time { sleep(Duration::from_millis((200 - elapsed_time) as u64)).await; } } }); // Task#24: monitoring_scoreboard tokio::task::spawn(async move { let mut elapsed_time = 0; loop { let instant = Instant::now(); let mut all_data = AllData::new(); // realtime price data all_data.rt_price_1m_vec = rx4_rt_price_1m_map.borrow().clone(); let result = coex::exchange_team::monitoring_scoreboard(&all_data).await; // send Task#0 a message to notify running on match result { Ok(T) => { tx_task24 .send(24) .expect("The mpsc channel has been closed."); } Err(E) => {} } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. elapsed_time = instant.elapsed().as_millis(); if 200 > elapsed_time { sleep(Duration::from_millis((200 - elapsed_time) as u64)).await; } } }); // Task#25: update current_total_usdt and available_usdt tokio::task::spawn(async move { let mut previous_result = false; let mut elapsed_time = 0; loop { let instant = Instant::now(); let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(3000)) .build() .unwrap(); let result = coex::assets_managing_team::monitoring_asset_usdt(&mut previous_result, &client) .await; // send Task#0 a message to notify running on match result { Ok(T) => { tx_task25 .send(25) .expect("The mpsc channel has been closed."); } Err(E) => { log::error!("{}", E); } } // sleep as much as the loop recurs per 300ms if all operation finished within 300ms. elapsed_time = instant.elapsed().as_millis(); if 250 > elapsed_time { sleep(Duration::from_millis((250 - elapsed_time) as u64)).await; } } }); // Task#26: update kelly_criterion tokio::task::spawn(async move { let mut elapsed_time = 0; loop { let instant = Instant::now(); let result = coex::assets_managing_team::update_kelly_criterion().await; // send Task#0 a message to notify running on match result { Ok(T) => { tx_task26 .send(26) .expect("The mpsc channel has been closed."); } Err(E) => { log::error!("{}", E); } } // sleep as much as the loop recurs per 1 minutes elapsed_time = instant.elapsed().as_secs(); if 60 > elapsed_time { sleep(Duration::from_secs((60 - elapsed_time) as u64)).await; } } }); // Task#27: request delist symbols tokio::task::spawn(async move { let mut elapsed_time = 0; loop { let instant = Instant::now(); let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(3000)) .build() .unwrap(); let tx1_changed = local_epoch_rx2.changed().await; let tx2_changed = epoch_difference_rx2.changed().await; let local_epoch = *local_epoch_rx2.borrow(); let difference_epoch = *epoch_difference_rx2.borrow(); let result = request_others::request_delist_symbols( API_KEY, SECRET_KEY, local_epoch, difference_epoch, &client, ) .await; tx_task27 .send(27) .expect("The mpsc channel has been closed."); // sleep as much as the loop recurs per 300 seconds if all operation finished within 300 seconds. elapsed_time = instant.elapsed().as_secs(); if 300 > elapsed_time { sleep(Duration::from_secs((300 - elapsed_time) as u64)).await; } } }); // Futures Section // Task#XX: get future last price tokio::task::spawn(async move { sleep(Duration::from_secs(20)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(1000)) .build() .unwrap(); let mut future_price_map_temp: HashMap = HashMap::new(); future::order::get_last_price(&client, &mut future_price_map_temp).await; if future_price_map_temp.len() != 0 { tx_future_price_map.send_modify(|vec| *vec = future_price_map_temp); } } if 333 > elapsed_time { sleep(Duration::from_millis((333 - elapsed_time) as u64)).await; } }); // Task#XX: get future exchange information tokio::task::spawn(async move { sleep(Duration::from_secs(20)).await; let client = ClientBuilder::new() .timeout(Duration::from_millis(1000)) .build() .unwrap(); let mut elapsed_time = 0; loop { let instant = Instant::now(); let mut futures_exchange_info_map_temp: HashMap = HashMap::new(); let result = future::order::request_future_exchange_infomation( &client, &mut futures_exchange_info_map_temp, ) .await; if tx_futures_exchange_info.is_closed() { log::error!("tx_futures_exchange_info has been closed!"); } else { if futures_exchange_info_map_temp.len() != 0 { tx_futures_exchange_info.send_modify(|map| *map = futures_exchange_info_map_temp); } } // send Task#0 a message to notify running on match result { Ok(T) => { } Err(E) => {} } // sleep as much as the loop recurs per 10 second if all operation finished within 10 second. elapsed_time = instant.elapsed().as_millis(); if 10000 > elapsed_time { sleep(Duration::from_millis((10000 - elapsed_time) as u64)).await; } } }); // Task#XX: get futures trade fee and available balance(USDT) tokio::task::spawn(async move { sleep(Duration::from_secs(3)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(1000)) .build() .unwrap(); let mut futures_trade_fee_temp = FuturesTradeFee::new(); future::table_mgmt::get_tradefee_balance(&mut futures_trade_fee_temp, &client).await; tx_futures_trade_fee.send_modify(|vec| *vec = futures_trade_fee_temp); } if 2000 > elapsed_time { sleep(Duration::from_millis((2000 - elapsed_time) as u64)).await; } }); // Task#XX: update price of filled positions tokio::task::spawn(async move { sleep(Duration::from_secs(20)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let coin_price_map = rx_future_price_map.borrow().clone(); let futures_exchange_info_map = rx_futures_exchange_info.borrow().clone(); let future_trade_fee = rx_futures_trade_fee.borrow().clone(); let result = future::table_mgmt::update_price_of_filled_positions( &coin_price_map, &futures_exchange_info_map, &future_trade_fee, ) .await; // send Task#0 a message to notify running on match result { Ok(T) => { } Err(E) => {} } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. elapsed_time = instant.elapsed().as_millis(); if 100 > elapsed_time { sleep(Duration::from_millis((100 - elapsed_time) as u64)).await; } } }); // Task#XX: monitoring ordered positions tokio::task::spawn(async move { sleep(Duration::from_secs(20)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let client = ClientBuilder::new() .timeout(tokio::time::Duration::from_millis(1000)) .build() .unwrap(); let future_trade_fee = rx2_futures_trade_fee.borrow().clone(); let result = future::order::monitoring_unfilled_order( &client, &future_trade_fee, ) .await; // send Task#0 a message to notify running on match result { Ok(T) => { } Err(E) => {} } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. elapsed_time = instant.elapsed().as_millis(); if 100 > elapsed_time { sleep(Duration::from_millis((100 - elapsed_time) as u64)).await; } } }); // Task#XX: monitoring ordered positions tokio::task::spawn(async move { sleep(Duration::from_secs(20)).await; let mut elapsed_time = 0; loop { let instant = Instant::now(); let coin_price_map = rx2_future_price_map.borrow().clone(); let futures_exchange_info_map = rx2_futures_exchange_info.borrow().clone(); let future_trade_fee = rx3_futures_trade_fee.borrow().clone(); let result = future::order::entry_position( &coin_price_map, &futures_exchange_info_map, &future_trade_fee, ) .await; // send Task#0 a message to notify running on match result { Ok(T) => { } Err(E) => {} } // sleep as much as the loop recurs per 1 second if all operation finished within 1 second. elapsed_time = instant.elapsed().as_millis(); if 50 > elapsed_time { sleep(Duration::from_millis((50 - elapsed_time) as u64)).await; } } }); loop {} Ok(()) } fn program_setting() { let matches = Command::new("Tradingbot") .arg( arg!(log_level: -l --log "Select log level: trace, debug, info, warn, error") .default_value("error"), ) .arg(arg!(mode: -m --mode "Select mode: real, simul, test").required(true)) .get_matches(); // set log level set_up_color_terminal(); if let Some(level) = matches.get_one::("log_level") { match level.clone().to_ascii_lowercase().as_str() { "trace" => simple_logger::init_with_level(Level::Trace).unwrap(), "debug" => simple_logger::init_with_level(Level::Debug).unwrap(), "info" => simple_logger::init_with_level(Level::Info).unwrap(), "warn" => simple_logger::init_with_level(Level::Warn).unwrap(), "error" => simple_logger::init_with_level(Level::Error).unwrap(), _ => { simple_logger::init_with_level(Level::Error).unwrap(); log::error!("wrong log level argument."); std::process::exit(0); } } } // set mode let mut mode: String = matches.get_one::("mode").unwrap().clone(); unsafe { match mode.to_ascii_lowercase().as_str() { "real" => { RUNNING_MODE = RunningMode::REAL; println!("*** REAL MODE ***"); } "simul" => { RUNNING_MODE = RunningMode::SIMUL; println!("*** SIMULATION MODE ***"); } "test" => { RUNNING_MODE = RunningMode::TEST; println!("*** TEST MODE ***"); } _ => { log::error!("wrong mode argument."); std::process::exit(0); } } } }