tradingbot/src/main.rs

1653 lines
68 KiB
Rust

#![allow(unused)]
#![allow(warnings)]
use crate::coin_health_check_team::*;
use crate::request_candles::CandleData;
use crate::request_others::{ExchangeInfo, TradeFee};
use crate::server_health_check_team::ServerHealth;
use crate::strategy_team::AllData;
use crate::time_checking_team::UserTime;
use crate::value_estimation_team::datapoints::price_data::RealtimePriceData;
use clap::{arg, Command};
use log::Level;
use reqwest::{Client, ClientBuilder};
use rust_decimal::Decimal;
use simple_logger::set_up_color_terminal;
use sqlx::{mysql::*, Connection, Executor, FromRow, Row};
use tradingbot::future::{FuturesExchangeInfo, FuturesTradeFee};
use std::collections::{HashMap, HashSet};
use std::{
io::{self, Write},
sync::Arc,
};
use tokio::{fs::*, join, sync::mpsc, sync::watch, sync::Mutex, task::*, time::*};
use tradingbot::{RunningMode::*, *};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// future last price data
let mut future_price_map: HashMap<String, f64> = HashMap::new(); // <symbol, price>
let (tx_future_price_map, mut rx_future_price_map) = watch::channel(future_price_map);
let mut rx2_future_price_map = rx_future_price_map.clone();
let mut futures_exchange_info_map: HashMap<String, FuturesExchangeInfo> = HashMap::new();
let (tx_futures_exchange_info, mut rx_futures_exchange_info) = watch::channel(futures_exchange_info_map);
let mut rx2_futures_exchange_info = rx_futures_exchange_info.clone();
let mut rx3_futures_exchange_info = rx_futures_exchange_info.clone();
let mut rx4_futures_exchange_info = rx_futures_exchange_info.clone();
let mut futures_trade_fee = FuturesTradeFee::new();
let (tx_futures_trade_fee, mut rx_futures_trade_fee) = watch::channel(futures_trade_fee);
let mut rx2_futures_trade_fee = rx_futures_trade_fee.clone();
let mut rx3_futures_trade_fee = rx_futures_trade_fee.clone();
// parse argument and set program preference
program_setting();
// Datebase initialization
initialization::initialization().await;
// // let c1 = vec!(("name", "Shima Nabil"));
// // println!("{:?}", database_control::update_record(&table_name, &c1, &("grade", "3")).await);
// //database_control::exists_table(&table_name).await;
// // let table_name = String::from("all24hstatistics");
// // let result = database_control::count_rows(&table_name).await;
// // println!("{:?}", result);
// // let rows = sqlx::query("select * from serverhealth")
// // .map(|r| {
// // a.id = r.get::<i32, _>("id");
// // a.waiting_maximum = r.get::<i32, _>("waiting_maximum");
// // a.server_on = r.get::<bool, _>("server_on");
// // a.ping_on = r.get::<bool, _>("ping_on");
// // a.wallet_system_on = r.get::<bool, _>("wallet_system_on");
// //
// // })
// // .collect::<Vec<String>>()
// // .fetch(&pool).await;
// // let str_result = rows
// // .iter()
// // .map(|r| format!("{} {}", r.get::<i32, _>("id"), r.get::<bool, _>("server_on")))
// // .collect::<Vec<String>>()
// // .join(", ");
// // println!("{:?}", rows);
// //
let (tx_task1, mut rx_task0) = mpsc::unbounded_channel::<u32>();
let tx_task2 = tx_task1.clone(); // for Task#2
let tx_task3 = tx_task1.clone(); // for Task#3
let tx_task4 = tx_task1.clone(); // for Task#4
let tx_task5 = tx_task1.clone(); // for Task#5
let tx_task6 = tx_task1.clone(); // for Task#6
let tx_task7 = tx_task1.clone(); // for Task#7
let tx_task8 = tx_task1.clone(); // for Task#8
let tx_task9 = tx_task1.clone(); // for Task#9
let tx_task10 = tx_task1.clone(); // for Task#10
let tx_task11 = tx_task1.clone(); // for Task#11
let tx_task12 = tx_task1.clone(); // for Task#12
let tx_task13 = tx_task1.clone(); // for Task#13
let tx_task14 = tx_task1.clone(); // for Task#14
let tx_task15 = tx_task1.clone(); // for Task#15
let tx_task16 = tx_task1.clone(); // for Task#16
let tx_task17 = tx_task1.clone(); // for Task#17
let tx_task18 = tx_task1.clone(); // for Task#18
let tx_task19 = tx_task1.clone(); // for Task#19
let tx_task20 = tx_task1.clone(); // for Task#20
let tx_task21 = tx_task1.clone(); // for Task#21
let tx_task22 = tx_task1.clone(); // for Task#22
let tx_task23 = tx_task1.clone(); // for Task#23
let tx_task24 = tx_task1.clone(); // for Task#24
let tx_task25 = tx_task1.clone(); // for Task#25
let tx_task26 = tx_task1.clone(); // for Task#26
let tx_task27 = tx_task1.clone(); // for Task#27
let (local_epoch_tx1, mut local_epoch_rx1) = watch::channel(0); // local_epoch
let (epoch_difference_tx1, mut epoch_difference_rx1) = watch::channel(0); // epoch_difference
let (local_epoch_tx2, mut local_epoch_rx2) = watch::channel(0); // local_epoch
let (epoch_difference_tx2, mut epoch_difference_rx2) = watch::channel(0); // epoch_difference
// trade fee data
let mut tradefee_map: HashMap<String, TradeFee> = HashMap::new(); // <symbol, TradeFee>
let (tx_tradefee_map, mut rx_tradefee_map) = watch::channel(tradefee_map);
let mut rx2_tradefee_map = rx_tradefee_map.clone();
let mut rx3_tradefee_map = rx_tradefee_map.clone();
let mut rx4_tradefee_map = rx_tradefee_map.clone();
let mut rx5_tradefee_map = rx_tradefee_map.clone();
let mut tradefee_map_capacity = rx_tradefee_map.clone();
// valid usdt trade data
let mut valid_usdt_trade_set: HashSet<String> = HashSet::new(); // symbol
let (tx_valid_usdt_trade_set, mut rx_valid_usdt_trade_set) =
watch::channel(valid_usdt_trade_set);
let mut rx2_valid_usdt_trade_set = rx_valid_usdt_trade_set.clone();
let mut rx3_valid_usdt_trade_set = rx_valid_usdt_trade_set.clone();
let mut rx4_valid_usdt_trade_set = rx_valid_usdt_trade_set.clone();
let mut valid_usdt_trade_set_capacity = rx_valid_usdt_trade_set.clone();
// price per second data and channels
let mut price_map: HashMap<String, f64> = HashMap::new(); // <symbol, price>
let (tx_price_map, mut rx_price_map) = watch::channel(price_map);
let mut rx3_price_map = rx_price_map.clone();
let mut rx5_price_map = rx_price_map.clone();
let mut price_map_capacity = rx_price_map.clone();
// candle data from endpoint and channels
let mut candle_1m_map: HashMap<String, Vec<CandleData>> = HashMap::new(); // <symbol, Vec<CandleData>>
let (tx_candle_1m_map, mut rx_candle_1m_map) = watch::channel(candle_1m_map);
let mut candle_1m_map_capacity = rx_candle_1m_map.clone();
let mut candle_30m_map: HashMap<String, Vec<CandleData>> = HashMap::new();
let (tx_candle_30m_map, mut rx_candle_30m_map) = watch::channel(candle_30m_map);
let mut candle_30m_map_capacity = rx_candle_30m_map.clone();
let mut candle_1d_map: HashMap<String, Vec<CandleData>> = HashMap::new();
let (tx_candle_1d_map, mut rx_candle_1d_map) = watch::channel(candle_1d_map);
let mut candle_1d_map_capacity = rx_candle_1d_map.clone();
let mut candle_1w_map: HashMap<String, Vec<CandleData>> = HashMap::new();
let (tx_candle_1w_map, mut rx_candle_1w_map) = watch::channel(candle_1w_map);
let mut candle_1w_map_capacity = rx_candle_1w_map.clone();
let mut candle_1mon_map: HashMap<String, Vec<CandleData>> = HashMap::new();
let (tx_candle_1mon_map, mut rx_candle_1mon_map) = watch::channel(candle_1mon_map);
let mut candle_1mon_map_capacity = rx_candle_1mon_map.clone();
// real-time reflected price data and channels
let mut rt_price_1m_map: HashMap<String, Vec<RealtimePriceData>> = HashMap::new(); // <symbol, Vec<RealtimePriceData>>
let (tx_rt_price_1m_map, mut rx_rt_price_1m_map) = watch::channel(rt_price_1m_map);
let mut rx2_rt_price_1m_map = rx_rt_price_1m_map.clone();
let mut rx3_rt_price_1m_map = rx_rt_price_1m_map.clone();
let mut rx4_rt_price_1m_map = rx_rt_price_1m_map.clone();
let mut rx5_rt_price_1m_map = rx_rt_price_1m_map.clone();
let mut rt_price_1m_map_capacity = rx_rt_price_1m_map.clone();
let mut rt_price_30m_map: HashMap<String, Vec<RealtimePriceData>> = HashMap::new();
let (tx_rt_price_30m_map, mut rx_rt_price_30m_map) = watch::channel(rt_price_30m_map);
let mut rx2_rt_price_30m_map = rx_rt_price_30m_map.clone();
let mut rx3_rt_price_30m_map = rx_rt_price_30m_map.clone();
let mut rx4_rt_price_30m_map = rx_rt_price_30m_map.clone();
let mut rx5_rt_price_30m_map = rx_rt_price_30m_map.clone();
let mut rt_price_30m_map_capacity = rx_rt_price_30m_map.clone();
let mut rt_price_1d_map: HashMap<String, Vec<RealtimePriceData>> = HashMap::new();
let (tx_rt_price_1d_map, mut rx_rt_price_1d_map) = watch::channel(rt_price_1d_map);
let mut rx2_rt_price_1d_map = rx_rt_price_1d_map.clone();
let mut rx3_rt_price_1d_map = rx_rt_price_1d_map.clone();
let mut rx4_rt_price_1d_map = rx_rt_price_1d_map.clone();
let mut rt_price_1d_map_capacity = rx_rt_price_1d_map.clone();
let mut rt_price_1w_map: HashMap<String, Vec<RealtimePriceData>> = HashMap::new();
let (tx_rt_price_1w_map, mut rx_rt_price_1w_map) = watch::channel(rt_price_1w_map);
let mut rx2_rt_price_1w_map = rx_rt_price_1w_map.clone();
let mut rx3_rt_price_1w_map = rx_rt_price_1w_map.clone();
let mut rx4_rt_price_1w_map = rx_rt_price_1w_map.clone();
let mut rt_price_1w_map_capacity = rx_rt_price_1w_map.clone();
let mut rt_price_1mon_map: HashMap<String, Vec<RealtimePriceData>> = HashMap::new();
let (tx_rt_price_1mon_map, mut rx_rt_price_1mon_map) = watch::channel(rt_price_1mon_map);
let mut rx2_rt_price_1mon_map = rx_rt_price_1mon_map.clone();
let mut rx3_rt_price_1mon_map = rx_rt_price_1mon_map.clone();
let mut rx4_rt_price_1mon_map = rx_rt_price_1mon_map.clone();
let mut rt_price_1mon_map_capacity = rx_rt_price_1mon_map.clone();
// TEMA data
// let mut tema3_1m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // Vec<(symbol, Vec<(price, closetime)>)>
// let (tx_tema3_1m_data, mut rx_tema3_1m_data) = watch::channel(tema3_1m_data);
// let mut tema3_30m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema3_30m_data, mut rx_tema3_30m_data) = watch::channel(tema3_30m_data);
// let mut tema3_1d_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema3_1d_data, mut rx_tema3_1d_data) = watch::channel(tema3_1d_data);
// let mut tema3_1w_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema3_1w_data, mut rx_tema3_1w_data) = watch::channel(tema3_1w_data);
// let mut tema3_1mon_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema3_1mon_data, mut rx_tema3_1mon_data) = watch::channel(tema3_1mon_data);
// let mut tema10_1m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // Vec<(symbol, Vec<(price, closetime)>)>
// let (tx_tema10_1m_data, mut rx_tema10_1m_data) = watch::channel(tema10_1m_data);
// let mut tema10_30m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema10_30m_data, mut rx_tema10_30m_data) = watch::channel(tema10_30m_data);
// let mut tema10_1d_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema10_1d_data, mut rx_tema10_1d_data) = watch::channel(tema10_1d_data);
// let mut tema10_1w_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema10_1w_data, mut rx_tema10_1w_data) = watch::channel(tema10_1w_data);
// let mut tema10_1mon_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema10_1mon_data, mut rx_tema10_1mon_data) = watch::channel(tema10_1mon_data);
// let mut tema30_1m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // Vec<(symbol, Vec<(price, closetime)>)>
// let (tx_tema30_1m_data, mut rx_tema30_1m_data) = watch::channel(tema30_1m_data);
// let mut tema30_30m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema30_30m_data, mut rx_tema30_30m_data) = watch::channel(tema30_30m_data);
// let mut tema30_1d_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema30_1d_data, mut rx_tema30_1d_data) = watch::channel(tema30_1d_data);
// let mut tema30_1w_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema30_1w_data, mut rx_tema30_1w_data) = watch::channel(tema30_1w_data);
// let mut tema30_1mon_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema30_1mon_data, mut rx_tema30_1mon_data) = watch::channel(tema30_1mon_data);
// Exchange Information data
let mut exchange_info_map: HashMap<String, ExchangeInfo> = HashMap::new();
let (tx_exchange_info_map, mut rx_exchange_info_map) = watch::channel(exchange_info_map);
let mut rx2_exchange_info_map = rx_exchange_info_map.clone();
let mut rx3_exchange_info_map = rx_exchange_info_map.clone();
let mut rx4_exchange_info_map = rx_exchange_info_map.clone();
let mut rx5_exchange_info_map = rx_exchange_info_map.clone();
let mut rx6_exchange_info_map = rx_exchange_info_map.clone();
let mut exchange_info_map_capacity = rx_exchange_info_map.clone();
{
// pre-fetching candle data: 30m, 1d, 1w and 1mon
print!("pre-fetching candle data: 30m, 1d, 1w and 1mon..");
std::io::stdout().flush();
let instant = Instant::now();
let interval_30m = String::from("30m");
let interval_1d = String::from("1d");
let interval_1w = String::from("1w");
let interval_1mon = String::from("1mon");
let mut candle_30m_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
let mut candle_1d_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
let mut candle_1w_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
let mut candle_1mon_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
request_candles::fetch_candle_parallel(&interval_30m, &mut candle_30m_map_temp).await;
request_candles::fetch_candle_parallel(&interval_1d, &mut candle_1d_map_temp).await;
request_candles::fetch_candle_parallel(&interval_1w, &mut candle_1w_map_temp).await;
request_candles::fetch_candle_parallel(&interval_1mon, &mut candle_1mon_map_temp).await;
tx_candle_30m_map.send_modify(|map| *map = candle_30m_map_temp);
tx_candle_1d_map.send_modify(|map| *map = candle_1d_map_temp);
tx_candle_1w_map.send_modify(|map| *map = candle_1w_map_temp);
tx_candle_1mon_map.send_modify(|map| *map = candle_1mon_map_temp);
// sleep as much as the loop recurs per 60 seconds, expecting child threads will have finished within 60 seconds.
println!("Ok..");
unsafe {
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
let mut elapsed_time = instant.elapsed().as_secs();
let mut remaining_time: u64 = 60 - elapsed_time;
while remaining_time > 0 && 60 > remaining_time {
print!("\rstart tradingbot in {:2} seconds", remaining_time);
io::stdout().flush();
elapsed_time = instant.elapsed().as_secs();
loop {
let temp_elapsed_time = instant.elapsed().as_secs();
let is_overflowed = remaining_time.overflowing_sub(1).1;
if is_overflowed == true {
break;
}
if elapsed_time < temp_elapsed_time {
remaining_time -= 1;
break;
}
}
}
println!("\nTradingbot is running");
}
}
}
// Task#0: monitoring tasks
tokio::task::spawn(async move {
loop {
let result = rx_task0.recv().await;
match result {
Some(_) => {
print!("\r{:2}", result.unwrap());
io::stdout().flush();
}
None => {}
}
sleep(Duration::from_millis(50)).await;
}
});
// Task#1: check server time and health
tokio::task::spawn(async move {
let mut prev_server_epoch: u128 = 0;
let mut epoch_difference: f64 = 0.0;
let mut epoch_mean: f64 = 0.0;
let mut elapsed_time: u128 = 0;
loop {
let instant = Instant::now();
let client = ClientBuilder::new()
.timeout(Duration::from_millis(1000))
.build()
.unwrap();
let mut usertime = UserTime::new();
let mut serverhealth = ServerHealth::new();
let mut epoch_difference_vec: Vec<f64> = Vec::new();
// let mut usertime_mutex = usertime_arc.lock().await;
// server_health_check_team::execute_server_health_check(&mut usertime_mutex).await;
// time_checking_team::execute_time_check(&mut usertime_mutex).await;
let result = server_health_check_team::execute_server_health_check(
&mut usertime,
&mut serverhealth,
&client,
)
.await;
while let Err(E) = time_checking_team::execute_time_check(&mut usertime, &client).await
{
sleep(Duration::from_millis((100))).await;
while let Err(e) = server_health_check_team::execute_server_health_check(
&mut usertime,
&mut serverhealth,
&client,
)
.await
{}
}
match result {
Ok(T) => {
local_epoch_tx1
.send(usertime.local_epoch)
.expect("local_epoch_tx1-local_epoch_rx1 channel has been closed.");
epoch_difference_tx1.send(usertime.epoch_difference).expect(
"epoch_difference_tx1-epoch_difference_rx1 channel has been closed.",
);
local_epoch_tx2
.send(usertime.local_epoch)
.expect("local_epoch_tx2-local_epoch_rx2 channel has been closed.");
epoch_difference_tx2.send(usertime.epoch_difference).expect(
"epoch_difference_tx2-epoch_difference_rx2 channel has been closed.",
);
tx_task1.send(1).expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
// considering RTT and to make the wait time 1 second, latency each second for every 30 seconds is averaged and subtracted with wait time.
elapsed_time = instant.elapsed().as_millis();
if 1_000 > elapsed_time {
if prev_server_epoch != 0 {
epoch_mean = 0.0;
epoch_difference =
(usertime.server_epoch as f64) - (prev_server_epoch as f64) - 1000.0; // milli second
if epoch_difference > 300.0 || epoch_difference < -300.0 {
epoch_difference_vec.clear();
epoch_difference = 0.0;
}
if epoch_difference_vec.len() == 10 {
epoch_difference_vec.rotate_left(1);
epoch_difference_vec.pop();
epoch_difference_vec.push(epoch_difference);
} else if epoch_difference_vec.len() < 10 {
epoch_difference_vec.push(epoch_difference);
}
for element in &epoch_difference_vec {
epoch_mean += *element;
}
epoch_mean = epoch_mean / (epoch_difference_vec.len() as f64);
}
if epoch_mean > 0.0 {
if elapsed_time > ((epoch_mean) as u128)
&& 1_000 > elapsed_time + ((epoch_mean) as u128)
{
sleep(Duration::from_millis(
(1_000 - elapsed_time - ((epoch_mean) as u128)) as u64,
))
.await;
} else {
sleep(Duration::from_millis((1_000 - elapsed_time) as u64)).await;
}
} else {
sleep(Duration::from_millis(
(1_000 - elapsed_time + ((epoch_mean.abs()) as u128)) as u64,
))
.await;
}
}
prev_server_epoch = usertime.server_epoch;
}
});
// Task#2: request trade fee
tokio::task::spawn(async move {
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let tx1_changed = local_epoch_rx1.changed().await;
let tx2_changed = epoch_difference_rx1.changed().await;
let local_epoch = *local_epoch_rx1.borrow();
let difference_epoch = *epoch_difference_rx1.borrow();
match tx1_changed {
Ok(T) => match tx2_changed {
Ok(T) => {
let mut tradefee_vec_temp: HashMap<String, TradeFee> = HashMap::new();
loop {
let result = request_others::request_trade_fee(
API_KEY,
SECRET_KEY,
local_epoch,
difference_epoch,
&client,
&mut tradefee_vec_temp,
)
.await;
if tradefee_vec_temp.len() == 0 {
sleep(Duration::from_secs(3)).await;
} else {
break;
}
}
tx_tradefee_map.send_modify(|vec| *vec = tradefee_vec_temp);
tx_task2.send(2).expect("The mpsc channel has been closed.");
}
Err(E) => {
panic!("tx2-rx2 channel has been closed.")
}
},
Err(E) => {
panic!("tx1-rx1 channel has been closed.")
}
}
// sleep as much as the loop recurs per 60 seconds if all operation finished within 60 seconds.
elapsed_time = instant.elapsed().as_millis();
if 60_000 > elapsed_time {
sleep(Duration::from_millis((60_000 - elapsed_time) as u64)).await;
}
}
});
// Task#3: request lot stepsize and ticksize
tokio::task::spawn(async move {
loop {
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let mut exchange_info_map_temp: HashMap<String, ExchangeInfo> = HashMap::new();
let mut result;
loop {
result = coin_health_check_team::request_others::request_exchange_infomation(
&client,
&mut exchange_info_map_temp,
)
.await;
// retry
if exchange_info_map_temp.len() == 0 {
sleep(Duration::from_secs(3)).await;
} else {
break;
}
}
tx_exchange_info_map.send_modify(|vec| *vec = exchange_info_map_temp);
tx_task3.send(3).expect("The mpsc channel has been closed.");
sleep(Duration::from_secs(300)).await; // sleep for 5 mins
}
});
// Task#4: request 24h price changes,
// pick valid USDT Trades,
// filtering stop USDT Trades,
// monitor total_24h_change_profit_index,
// usdt_24h_change_profit_index,
// total_price_down_dist_index
tokio::task::spawn(async move {
sleep(Duration::from_secs(10)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let result = request_others::request_24hr_ticker_price_change_statistics(&client).await;
match result {
Ok(T) => {
let exchange_info_map = rx5_exchange_info_map.borrow().clone();
let mut valid_usdt_trade_set_temp: HashSet<String> = HashSet::new();
monitors::collect_valid_usde_trade(
&mut valid_usdt_trade_set_temp,
&exchange_info_map,
)
.await;
tx_valid_usdt_trade_set.send_modify(|vec| *vec = valid_usdt_trade_set_temp);
tx_task4.send(4).expect("The mpsc channel has been closed.");
}
Err(E) => {
log::warn!(">>> Failed to monitor usdt_24h_change_profit_index.");
}
}
// match result {
// Ok(T) => {
// let result = monitors::total_24h_change_profit_index().await;
// match result {
// Ok(T) => {},
// Err(E) => {
// log::warn!(">>> Failed to monitor total_24h_change_profit_index.");
// }
// };
// let result = monitors::usdt_24h_change_profit_index().await;
// match result {
// Ok(T) => {},
// Err(E) => {
// log::warn!(">>> Failed to monitor usdt_24h_change_profit_index.");
// }
// };
// let result = monitors::total_price_down_dist_index().await;
// match result {
// Ok(T) => {},
// Err(E) => {
// log::warn!(">>> Failed to monitor total_price_down_dist_index.");
// }
// };
// },
// Err(E) => {
// log::warn!(">>> Failed to fetch 24h price change data from endpoint or parse message.");
// }
// };
// sleep as much as the loop recurs per 30 seconds if all operation finished within 30 seconds.
elapsed_time = instant.elapsed().as_millis();
if 30_000 > elapsed_time {
sleep(Duration::from_millis((30_000 - elapsed_time) as u64)).await;
}
}
});
// Task#5: price per second
tokio::task::spawn(async move {
sleep(Duration::from_secs(20)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(1000))
.build()
.unwrap();
let mut price_vec_temp: HashMap<String, f64> = HashMap::new();
let mut price_vec_temp_c: HashMap<String, f64> = HashMap::new();
request_others::request_all_coin_price(&client, &mut price_vec_temp).await;
price_vec_temp_c = price_vec_temp.clone();
tx_price_map.send_modify(|vec| *vec = price_vec_temp);
tx_task5.send(5).expect("The mpsc channel has been closed.");
// Task#10: make realtime price data of 1m, 30m, 1d, 1w and 1mon
let valid_usdt_trade_set = rx_valid_usdt_trade_set.borrow().clone();
// 1m
let interval = String::from("1m");
let candle_1m_vec = rx_candle_1m_map.borrow().clone();
let dummy_data: HashMap<String, Vec<RealtimePriceData>> = HashMap::new();
let mut rt_price_1m_map_write_temp: HashMap<String, Vec<RealtimePriceData>> =
HashMap::new();
let mut rt_price_1m_map_write_temp_c: HashMap<String, Vec<RealtimePriceData>> =
HashMap::new();
let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(
&interval,
&candle_1m_vec,
&dummy_data,
&mut rt_price_1m_map_write_temp,
&price_vec_temp_c,
&valid_usdt_trade_set,
)
.await;
rt_price_1m_map_write_temp_c = rt_price_1m_map_write_temp.clone();
if tx_rt_price_1m_map.is_closed() {
log::error!("tx_rt_price_1m_vec has been closed!");
} else {
tx_rt_price_1m_map.send_modify(|vec| *vec = rt_price_1m_map_write_temp);
}
// 30m
let interval = String::from("30m");
let candle_30m_map = rx_candle_30m_map.borrow().clone();
let mut rt_price_30m_map_write_temp: HashMap<String, Vec<RealtimePriceData>> =
HashMap::new();
let mut rt_price_30m_map_write_temp_c: HashMap<String, Vec<RealtimePriceData>> =
HashMap::new();
if !rt_price_1m_map_write_temp_c.is_empty() {
let result =
value_estimation_team::datapoints::price_data::update_realtime_price_data(
&interval,
&candle_30m_map,
&rt_price_1m_map_write_temp_c,
&mut rt_price_30m_map_write_temp,
&price_vec_temp_c,
&valid_usdt_trade_set,
)
.await;
rt_price_30m_map_write_temp_c = rt_price_30m_map_write_temp.clone();
if tx_rt_price_30m_map.is_closed() {
log::error!("tx_rt_price_30m_vec has been closed!");
} else {
tx_rt_price_30m_map.send_modify(
|map: &mut HashMap<String, Vec<RealtimePriceData>>| {
*map = rt_price_30m_map_write_temp
},
);
}
}
// 1d
let interval = String::from("1d");
let candle_1d_vec = rx_candle_1d_map.borrow().clone();
let mut rt_price_1d_map_write_temp: HashMap<String, Vec<RealtimePriceData>> =
HashMap::new();
if !rt_price_30m_map_write_temp_c.is_empty() {
let result =
value_estimation_team::datapoints::price_data::update_realtime_price_data(
&interval,
&candle_1d_vec,
&rt_price_30m_map_write_temp_c,
&mut rt_price_1d_map_write_temp,
&price_vec_temp_c,
&valid_usdt_trade_set,
)
.await;
match result {
Ok(T) => {
if tx_rt_price_1d_map.is_closed() {
log::error!("tx_rt_price_1d_vec has been closed!");
} else {
tx_rt_price_1d_map.send_modify(|map| *map = rt_price_1d_map_write_temp);
}
}
Err(E) => {}
}
}
// { // 1w
// let interval = String::from("1w");
// let candle_1w_vec = rx_candle_1w_vec.borrow().clone();
// let rt_price_1d_vec = rx_rt_price_1d_vec.borrow().clone();
// let mut rt_price_1w_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
// let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1w_vec, &rt_price_1d_vec, &mut rt_price_1w_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await;
// match result {
// Ok(T) => {
// tx_rt_price_1w_vec.send_modify(|vec| *vec = rt_price_1w_vec_write_temp);
// tx_task10.send(10).expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// }
// { // 1mon
// let interval = String::from("1mon");
// let candle_1mon_vec = rx_candle_1mon_vec.borrow().clone();
// let rt_price_1w_vec = rx_rt_price_1w_vec.borrow().clone();
// let mut rt_price_1mon_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
// let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1mon_vec, &rt_price_1w_vec, &mut rt_price_1mon_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await;
// match result {
// Ok(T) => {
// tx_rt_price_1mon_vec.send_modify(|vec| *vec = rt_price_1mon_vec_write_temp);
// tx_task10.send(10).expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// }
// sleep as much as the loop recurs per 1 seconds if all operation finished within 1 seconds.
elapsed_time = instant.elapsed().as_millis();
if 500 > elapsed_time {
sleep(Duration::from_millis((500 - elapsed_time) as u64)).await;
}
}
});
// Task#6: fetch candle 1m
tokio::task::spawn(async move {
let mut elapsed_time = 0;
let interval = String::from("1m");
loop {
let instant = Instant::now();
let mut candle_1m_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
let result =
request_candles::fetch_candle_parallel(&interval, &mut candle_1m_map_temp).await;
match result {
Ok(T) => {
tx_candle_1m_map.send_modify(|vec| *vec = candle_1m_map_temp);
tx_task6.send(6).expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 10 seconds, expecting child threads will have finished within 10 seconds.
elapsed_time = instant.elapsed().as_millis();
if 30_000 > elapsed_time {
// 10_000 for major trade
sleep(Duration::from_millis((30_000 - elapsed_time) as u64)).await;
}
}
});
// Task#7: fetch candle 30m
tokio::task::spawn(async move {
let mut elapsed_time = 0;
let interval = String::from("30m");
loop {
let instant = Instant::now();
let mut candle_30m_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
let result =
request_candles::fetch_candle_delay(&interval, &mut candle_30m_map_temp).await;
// request_candles::fetch_candle_parallel(&interval, &mut candle_30m_vec_temp).await;
match result {
Ok(T) => {
tx_candle_30m_map.send_modify(|vec| *vec = candle_30m_map_temp);
tx_task7.send(7).expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 60 seconds, expecting child threads will have finished within 60 seconds.
elapsed_time = instant.elapsed().as_millis();
if 60_000 > elapsed_time {
//60_000
sleep(Duration::from_millis((60_000 - elapsed_time) as u64)).await;
}
}
});
// Task#8: fetch candle 1d
tokio::task::spawn(async move {
let mut elapsed_time = 0;
let interval = String::from("1d");
sleep(Duration::from_secs(600)).await;
loop {
let instant = Instant::now();
let mut candle_1d_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
let result =
request_candles::fetch_candle_delay(&interval, &mut candle_1d_map_temp).await;
match result {
Ok(T) => {
tx_candle_1d_map.send_modify(|vec| *vec = candle_1d_map_temp);
tx_task8.send(8).expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1800 seconds, expecting child threads will have finished within 1800 seconds.
elapsed_time = instant.elapsed().as_secs();
if 1_800 > elapsed_time {
sleep(Duration::from_secs((1_800 - elapsed_time) as u64)).await;
}
}
});
// // Task#9: fetch candle 1w
// tokio::task::spawn(async move{
// let interval = String::from("1w");
// sleep(Duration::from_secs(600)).await;
// loop{
// let mut candle_1w_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
// let result = request_candles::fetch_candle_delay(&interval, &mut candle_1w_vec_temp).await;
// match result {
// Ok(T) => {
// tx_candle_1w_vec.send_modify(|vec| *vec = candle_1w_vec_temp);
// tx_task9.send(9).expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// }
// });
// // Task#10: fetch candle 1mon
// tokio::task::spawn(async move{
// let interval = String::from("1mon");
// sleep(Duration::from_secs(600)).await;
// loop{
// let mut candle_1mon_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
// let result = request_candles::fetch_candle_delay(&interval, &mut candle_1mon_vec_temp).await;
// match result {
// Ok(T) => {
// tx_candle_1mon_vec.send_modify(|vec| *vec = candle_1mon_vec_temp);
// tx_task10.send(10).expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// }
// });
// Task#11: monitoring total market cap
// if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST {
// tokio::task::spawn(async move {
// loop {
// let result = signal_association::coinmarketcap::market_cap_index().await;
// match result {
// Ok(T) => {
// tx_task11
// .send(11)
// .expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// }
// });
// }
// Task#12: monitoring foreign exchange rate
// if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
// tokio::task::spawn(async move {
// loop {
// let result = signal_association::exchange_rate::monitoring_fx_rate_index().await;
// match result {
// Ok(T) => {
// tx_task12
// .send(12)
// .expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// sleep(Duration::from_secs(2700)).await; // sleep for 45mins
// }
// });
// }
// // // Task#13: monitoring dollar index
// // tokio::task::spawn(async move {
// // loop
// // {
// // let result = signal_association::dollar_index::monitoring_dollar_index().await;
// // match result {
// // Ok(T) => {
// // tx_task13.send(13).expect("The mpsc channel has been closed.");
// // }
// // Err(E) => {}
// // }
// // sleep(Duration::from_secs(1800)).await; // sleep for 30mins
// // }
// // });
// Task#14: monitoring signal decision
// if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST {
// tokio::task::spawn(async move {
// loop {
// let result =
// signal_association::signal_decision::monitoring_signal_decision().await;
// match result {
// Ok(T) => {
// tx_task14
// .send(14)
// .expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// sleep(Duration::from_millis(500)).await; // sleep for 0.5sec
// }
// });
// }
// Task#15: monitoring future ratio
// if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST {
// tokio::task::spawn(async move {
// loop {
// let result = signal_association::future_ratio::monitoring_future_ratio().await;
// match result {
// Ok(T) => {
// tx_task15
// .send(15)
// .expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// sleep(Duration::from_secs(60)).await; // sleep for 1min
// }
// });
// }
// COEX part
// Task#16: execute strategis for buy
unsafe {
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
tokio::task::spawn(async move {
sleep(Duration::from_secs(40)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let mut all_data = AllData::new();
all_data.valid_symbol_vec = rx3_valid_usdt_trade_set.borrow().clone();
// realtime price data
all_data.rt_price_1m_vec = rx3_rt_price_1m_map.borrow().clone();
all_data.rt_price_30m_vec = rx3_rt_price_30m_map.borrow().clone();
all_data.rt_price_1d_vec = rx3_rt_price_1d_map.borrow().clone();
all_data.rt_price_1w_vec = rx3_rt_price_1w_map.borrow().clone();
all_data.rt_price_1mon_vec = rx3_rt_price_1mon_map.borrow().clone();
// future exchange info
let futures_exchange_info = rx3_futures_exchange_info.borrow().clone();
let result =
strategy_team::strategy_manager::execute_list_up_for_buy(&all_data, &futures_exchange_info).await;
match result {
Ok(T) => {
tx_task16
.send(16)
.expect("The mpsc channel has been closed.");
}
Err(E) => {
// log::error!("Couldn't execute strategists.");
}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 250 > elapsed_time {
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
}
}
});
} else {
tokio::task::spawn(async move {
sleep(Duration::from_secs(10)).await;
let mut elapsed_time = 0;
let mut all_data = AllData::new();
loop {
let instant = Instant::now();
all_data.valid_symbol_vec = rx3_valid_usdt_trade_set.borrow().clone();
// realtime price data
all_data.rt_price_1m_vec = rx3_rt_price_1m_map.borrow().clone();
all_data.rt_price_30m_vec = rx3_rt_price_30m_map.borrow().clone();
all_data.rt_price_1d_vec = rx3_rt_price_1d_map.borrow().clone();
all_data.rt_price_1w_vec = rx3_rt_price_1w_map.borrow().clone();
all_data.rt_price_1mon_vec = rx3_rt_price_1mon_map.borrow().clone();
// let result = coex::strategy_team::execute_strategist_for_test(&all_data).await;
// match result {
// Ok(T) => {
// tx_task16
// .send(16)
// .expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 250 > elapsed_time {
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
}
}
});
}
}
// Task#17: execute strategis for sell
unsafe {
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
tokio::task::spawn(async move {
sleep(Duration::from_secs(40)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let mut all_data = AllData::new();
let mut exchange_info_map: HashMap<String, ExchangeInfo> = HashMap::new();
let mut trade_fee_map: HashMap<String, TradeFee> = HashMap::new();
all_data.valid_symbol_vec = rx4_valid_usdt_trade_set.borrow().clone();
exchange_info_map = rx6_exchange_info_map.borrow().clone();
trade_fee_map = rx5_tradefee_map.borrow().clone();
// realtime price data
all_data.rt_price_1m_vec = rx5_rt_price_1m_map.borrow().clone();
all_data.rt_price_30m_vec = rx5_rt_price_30m_map.borrow().clone();
all_data.rt_price_1d_vec = rx4_rt_price_1d_map.borrow().clone();
all_data.rt_price_1w_vec = rx4_rt_price_1w_map.borrow().clone();
all_data.rt_price_1mon_vec = rx4_rt_price_1mon_map.borrow().clone();
let futures_exchange_info = rx4_futures_exchange_info.borrow().clone();
let result = strategy_team::strategy_manager::execute_list_up_for_sell(
&all_data,
&exchange_info_map,
&futures_exchange_info,
&trade_fee_map,
)
.await;
match result {
Ok(T) => {
tx_task17
.send(17)
.expect("The mpsc channel has been closed.");
}
Err(E) => {
// log::error!("Couldn't execute strategists.");
}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 250 > elapsed_time {
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
}
}
});
}
}
// Task#18: monitoring pre-suggested coins
tokio::task::spawn(async move {
sleep(Duration::from_secs(30)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let coin_price_vec = rx3_price_map.borrow().clone();
let result = coex::exchange_team::monitoring_pre_suggested_coins(&coin_price_vec).await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task18
.send(18)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 100 > elapsed_time {
sleep(Duration::from_millis((100 - elapsed_time) as u64)).await;
}
}
});
// Task#19: buy_coin
tokio::task::spawn(async move {
sleep(Duration::from_secs(30)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let exchange_info_map = rx_exchange_info_map.borrow().clone();
let trade_fee_map = rx_tradefee_map.borrow().clone();
let result = coex::exchange_team::buy_coin(&exchange_info_map, &trade_fee_map).await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task19
.send(19)
.expect("The mpsc channel has been closed.");
}
Err(E) => {
eprint!("Error: {:?}", E);
}
}
// sleep as much as the loop recurs per 200ms second if all operation finished within 200ms
elapsed_time = instant.elapsed().as_millis();
if 200 > elapsed_time {
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
}
}
});
// Task#20: monitoring_open_buy_order
unsafe {
if RUNNING_MODE == REAL || RUNNING_MODE == TEST {
tokio::task::spawn(async move {
sleep(Duration::from_secs(30)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let trade_fee_map = rx2_tradefee_map.borrow().clone();
let result =
coex::order_team::monitoring_open_buy_order(&client, &trade_fee_map).await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task20
.send(20)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 200 > elapsed_time {
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
}
}
});
}
}
// Task#21: update_price_of_filled_buy_order
tokio::task::spawn(async move {
sleep(Duration::from_secs(30)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let coin_price_vec = rx5_price_map.borrow().clone();
let exchange_info_vec = rx2_exchange_info_map.borrow().clone();
let trade_fee_vec = rx3_tradefee_map.borrow().clone();
let result = coex::order_team::update_price_of_filled_buy_order(
&coin_price_vec,
&exchange_info_vec,
&trade_fee_vec,
)
.await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task21
.send(21)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 250 > elapsed_time {
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
}
}
});
// Task#22: monitoring_open_sell_order
unsafe {
if RUNNING_MODE == REAL || RUNNING_MODE == TEST {
tokio::task::spawn(async move {
sleep(Duration::from_secs(30)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let exchange_info_vec = rx4_exchange_info_map.borrow().clone();
let trade_fee_vec = rx4_tradefee_map.borrow().clone();
let result = coex::order_team::monitoring_open_sell_order(
&client,
&exchange_info_vec,
&trade_fee_vec,
)
.await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task22
.send(22)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 250 > elapsed_time {
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
}
}
});
}
}
// Task#23: monitoring_filled_sell_order
tokio::task::spawn(async move {
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let result = coex::order_team::monitoring_filled_sell_order(&client).await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task23
.send(23)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 200 > elapsed_time {
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
}
}
});
// Task#24: monitoring_scoreboard
tokio::task::spawn(async move {
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let mut all_data = AllData::new();
// realtime price data
all_data.rt_price_1m_vec = rx4_rt_price_1m_map.borrow().clone();
let result = coex::exchange_team::monitoring_scoreboard(&all_data).await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task24
.send(24)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 200 > elapsed_time {
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
}
}
});
// Task#25: update current_total_usdt and available_usdt
tokio::task::spawn(async move {
let mut previous_result = false;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let result =
coex::assets_managing_team::monitoring_asset_usdt(&mut previous_result, &client)
.await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task25
.send(25)
.expect("The mpsc channel has been closed.");
}
Err(E) => {
log::error!("{}", E);
}
}
// sleep as much as the loop recurs per 300ms if all operation finished within 300ms.
elapsed_time = instant.elapsed().as_millis();
if 250 > elapsed_time {
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
}
}
});
// Task#26: update kelly_criterion
tokio::task::spawn(async move {
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let result = coex::assets_managing_team::update_kelly_criterion().await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task26
.send(26)
.expect("The mpsc channel has been closed.");
}
Err(E) => {
log::error!("{}", E);
}
}
// sleep as much as the loop recurs per 1 minutes
elapsed_time = instant.elapsed().as_secs();
if 60 > elapsed_time {
sleep(Duration::from_secs((60 - elapsed_time) as u64)).await;
}
}
});
// Task#27: request delist symbols
tokio::task::spawn(async move {
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let tx1_changed = local_epoch_rx2.changed().await;
let tx2_changed = epoch_difference_rx2.changed().await;
let local_epoch = *local_epoch_rx2.borrow();
let difference_epoch = *epoch_difference_rx2.borrow();
let result = request_others::request_delist_symbols(
API_KEY,
SECRET_KEY,
local_epoch,
difference_epoch,
&client,
)
.await;
tx_task27
.send(27)
.expect("The mpsc channel has been closed.");
// sleep as much as the loop recurs per 300 seconds if all operation finished within 300 seconds.
elapsed_time = instant.elapsed().as_secs();
if 300 > elapsed_time {
sleep(Duration::from_secs((300 - elapsed_time) as u64)).await;
}
}
});
// Futures Section
// Task#XX: get future last price
tokio::task::spawn(async move {
sleep(Duration::from_secs(20)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(1000))
.build()
.unwrap();
let mut future_price_map_temp: HashMap<String, f64> = HashMap::new();
future::order::get_last_price(&client, &mut future_price_map_temp).await;
if future_price_map_temp.len() != 0 {
tx_future_price_map.send_modify(|vec| *vec = future_price_map_temp);
}
}
if 333 > elapsed_time {
sleep(Duration::from_millis((333 - elapsed_time) as u64)).await;
}
});
// Task#XX: get future exchange information
tokio::task::spawn(async move {
sleep(Duration::from_secs(20)).await;
let client = ClientBuilder::new()
.timeout(Duration::from_millis(1000))
.build()
.unwrap();
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let mut futures_exchange_info_map_temp: HashMap<String, FuturesExchangeInfo> = HashMap::new();
let result = future::order::request_future_exchange_infomation(
&client,
&mut futures_exchange_info_map_temp,
)
.await;
if tx_futures_exchange_info.is_closed() {
log::error!("tx_futures_exchange_info has been closed!");
} else {
if futures_exchange_info_map_temp.len() != 0 {
tx_futures_exchange_info.send_modify(|map| *map = futures_exchange_info_map_temp);
}
}
// send Task#0 a message to notify running on
match result {
Ok(T) => {
}
Err(E) => {}
}
// sleep as much as the loop recurs per 10 second if all operation finished within 10 second.
elapsed_time = instant.elapsed().as_millis();
if 10000 > elapsed_time {
sleep(Duration::from_millis((10000 - elapsed_time) as u64)).await;
}
}
});
// Task#XX: get futures trade fee and available balance(USDT)
tokio::task::spawn(async move {
sleep(Duration::from_secs(3)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(1000))
.build()
.unwrap();
let mut futures_trade_fee_temp = FuturesTradeFee::new();
future::table_mgmt::get_tradefee_balance(&mut futures_trade_fee_temp, &client).await;
tx_futures_trade_fee.send_modify(|vec| *vec = futures_trade_fee_temp);
}
if 2000 > elapsed_time {
sleep(Duration::from_millis((2000 - elapsed_time) as u64)).await;
}
});
// Task#XX: update price of filled positions
tokio::task::spawn(async move {
sleep(Duration::from_secs(20)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let coin_price_map = rx_future_price_map.borrow().clone();
let futures_exchange_info_map = rx_futures_exchange_info.borrow().clone();
let future_trade_fee = rx_futures_trade_fee.borrow().clone();
let result = future::table_mgmt::update_price_of_filled_positions(
&coin_price_map,
&futures_exchange_info_map,
&future_trade_fee,
)
.await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 100 > elapsed_time {
sleep(Duration::from_millis((100 - elapsed_time) as u64)).await;
}
}
});
// Task#XX: monitoring ordered positions
tokio::task::spawn(async move {
sleep(Duration::from_secs(20)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(1000))
.build()
.unwrap();
let future_trade_fee = rx2_futures_trade_fee.borrow().clone();
let result = future::order::monitoring_unfilled_order(
&client,
&future_trade_fee,
)
.await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 100 > elapsed_time {
sleep(Duration::from_millis((100 - elapsed_time) as u64)).await;
}
}
});
// Task#XX: monitoring ordered positions
tokio::task::spawn(async move {
sleep(Duration::from_secs(20)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let coin_price_map = rx2_future_price_map.borrow().clone();
let futures_exchange_info_map = rx2_futures_exchange_info.borrow().clone();
let future_trade_fee = rx3_futures_trade_fee.borrow().clone();
let result = future::order::entry_position(
&coin_price_map,
&futures_exchange_info_map,
&future_trade_fee,
)
.await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 50 > elapsed_time {
sleep(Duration::from_millis((50 - elapsed_time) as u64)).await;
}
}
});
// Task#XX: move closed positions
tokio::task::spawn(async move {
sleep(Duration::from_secs(20)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let result = future::table_mgmt::move_closed_positions().await;
// send Task#0 a message to notify running on
// match result {
// Ok(T) => {
// }
// Err(E) => {}
// }
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 100 > elapsed_time {
sleep(Duration::from_millis((100 - elapsed_time) as u64)).await;
}
}
});
loop {}
Ok(())
}
fn program_setting() {
let matches = Command::new("Tradingbot")
.arg(
arg!(log_level: -l --log <level> "Select log level: trace, debug, info, warn, error")
.default_value("error"),
)
.arg(arg!(mode: -m --mode <mode> "Select mode: real, simul, test").required(true))
.get_matches();
// set log level
set_up_color_terminal();
if let Some(level) = matches.get_one::<String>("log_level") {
match level.clone().to_ascii_lowercase().as_str() {
"trace" => simple_logger::init_with_level(Level::Trace).unwrap(),
"debug" => simple_logger::init_with_level(Level::Debug).unwrap(),
"info" => simple_logger::init_with_level(Level::Info).unwrap(),
"warn" => simple_logger::init_with_level(Level::Warn).unwrap(),
"error" => simple_logger::init_with_level(Level::Error).unwrap(),
_ => {
simple_logger::init_with_level(Level::Error).unwrap();
log::error!("wrong log level argument.");
std::process::exit(0);
}
}
}
// set mode
let mut mode: String = matches.get_one::<String>("mode").unwrap().clone();
unsafe {
match mode.to_ascii_lowercase().as_str() {
"real" => {
RUNNING_MODE = RunningMode::REAL;
println!("*** REAL MODE ***");
}
"simul" => {
RUNNING_MODE = RunningMode::SIMUL;
println!("*** SIMULATION MODE ***");
}
"test" => {
RUNNING_MODE = RunningMode::TEST;
println!("*** TEST MODE ***");
}
_ => {
log::error!("wrong mode argument.");
std::process::exit(0);
}
}
}
}