1411 lines
60 KiB
Rust
1411 lines
60 KiB
Rust
#![allow(unused)]
|
|
#![allow(warnings)]
|
|
|
|
use crate::coin_health_check_team::*;
|
|
use crate::request_candles::CandleData;
|
|
use crate::request_others::{ExchangeInfo, TradeFee};
|
|
use crate::server_health_check_team::ServerHealth;
|
|
use crate::strategy_team::AllData;
|
|
use crate::time_checking_team::UserTime;
|
|
use crate::value_estimation_team::datapoints::price_data::RealtimePriceData;
|
|
use clap::{arg, Command};
|
|
use log::Level;
|
|
use reqwest::{Client, ClientBuilder};
|
|
use rust_decimal::Decimal;
|
|
use simple_logger::set_up_color_terminal;
|
|
use sqlx::{mysql::*, Connection, Executor, FromRow, Row};
|
|
use std::collections::{HashMap, HashSet};
|
|
use std::{
|
|
io::{self, Write},
|
|
sync::Arc,
|
|
};
|
|
use tokio::{fs::*, join, sync::mpsc, sync::watch, sync::Mutex, task::*, time::*};
|
|
use tradingbot::{RunningMode::*, *};
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
// parse argument and set program preference
|
|
program_setting();
|
|
|
|
// Datebase initialization
|
|
initialization::initialization().await;
|
|
|
|
// // let c1 = vec!(("name", "Shima Nabil"));
|
|
// // println!("{:?}", database_control::update_record(&table_name, &c1, &("grade", "3")).await);
|
|
|
|
// //database_control::exists_table(&table_name).await;
|
|
// // let table_name = String::from("all24hstatistics");
|
|
// // let result = database_control::count_rows(&table_name).await;
|
|
// // println!("{:?}", result);
|
|
|
|
// // let rows = sqlx::query("select * from serverhealth")
|
|
// // .map(|r| {
|
|
// // a.id = r.get::<i32, _>("id");
|
|
// // a.waiting_maximum = r.get::<i32, _>("waiting_maximum");
|
|
// // a.server_on = r.get::<bool, _>("server_on");
|
|
// // a.ping_on = r.get::<bool, _>("ping_on");
|
|
// // a.wallet_system_on = r.get::<bool, _>("wallet_system_on");
|
|
// //
|
|
// // })
|
|
// // .collect::<Vec<String>>()
|
|
// // .fetch(&pool).await;
|
|
|
|
// // let str_result = rows
|
|
// // .iter()
|
|
// // .map(|r| format!("{} {}", r.get::<i32, _>("id"), r.get::<bool, _>("server_on")))
|
|
// // .collect::<Vec<String>>()
|
|
// // .join(", ");
|
|
// // println!("{:?}", rows);
|
|
// //
|
|
|
|
let (tx_task1, mut rx_task0) = mpsc::unbounded_channel::<u32>();
|
|
let tx_task2 = tx_task1.clone(); // for Task#2
|
|
let tx_task3 = tx_task1.clone(); // for Task#3
|
|
let tx_task4 = tx_task1.clone(); // for Task#4
|
|
let tx_task5 = tx_task1.clone(); // for Task#5
|
|
let tx_task6 = tx_task1.clone(); // for Task#6
|
|
let tx_task7 = tx_task1.clone(); // for Task#7
|
|
let tx_task8 = tx_task1.clone(); // for Task#8
|
|
let tx_task9 = tx_task1.clone(); // for Task#9
|
|
let tx_task10 = tx_task1.clone(); // for Task#10
|
|
let tx_task11 = tx_task1.clone(); // for Task#11
|
|
let tx_task12 = tx_task1.clone(); // for Task#12
|
|
let tx_task13 = tx_task1.clone(); // for Task#13
|
|
let tx_task14 = tx_task1.clone(); // for Task#14
|
|
let tx_task15 = tx_task1.clone(); // for Task#15
|
|
let tx_task16 = tx_task1.clone(); // for Task#16
|
|
let tx_task17 = tx_task1.clone(); // for Task#17
|
|
let tx_task18 = tx_task1.clone(); // for Task#18
|
|
let tx_task19 = tx_task1.clone(); // for Task#19
|
|
let tx_task20 = tx_task1.clone(); // for Task#20
|
|
let tx_task21 = tx_task1.clone(); // for Task#21
|
|
let tx_task22 = tx_task1.clone(); // for Task#22
|
|
let tx_task23 = tx_task1.clone(); // for Task#23
|
|
let tx_task24 = tx_task1.clone(); // for Task#24
|
|
let tx_task25 = tx_task1.clone(); // for Task#25
|
|
let tx_task26 = tx_task1.clone(); // for Task#26
|
|
let tx_task27 = tx_task1.clone(); // for Task#27
|
|
|
|
let (local_epoch_tx1, mut local_epoch_rx1) = watch::channel(0); // local_epoch
|
|
let (epoch_difference_tx1, mut epoch_difference_rx1) = watch::channel(0); // epoch_difference
|
|
let (local_epoch_tx2, mut local_epoch_rx2) = watch::channel(0); // local_epoch
|
|
let (epoch_difference_tx2, mut epoch_difference_rx2) = watch::channel(0); // epoch_difference
|
|
|
|
// trade fee data
|
|
let mut tradefee_map: HashMap<String, TradeFee> = HashMap::new(); // <symbol, TradeFee>
|
|
let (tx_tradefee_map, mut rx_tradefee_map) = watch::channel(tradefee_map);
|
|
let mut rx2_tradefee_map = rx_tradefee_map.clone();
|
|
let mut rx3_tradefee_map = rx_tradefee_map.clone();
|
|
let mut rx4_tradefee_map = rx_tradefee_map.clone();
|
|
let mut rx5_tradefee_map = rx_tradefee_map.clone();
|
|
let mut tradefee_map_capacity = rx_tradefee_map.clone();
|
|
|
|
// valid usdt trade data
|
|
let mut valid_usdt_trade_set: HashSet<String> = HashSet::new(); // symbol
|
|
let (tx_valid_usdt_trade_set, mut rx_valid_usdt_trade_set) =
|
|
watch::channel(valid_usdt_trade_set);
|
|
let mut rx2_valid_usdt_trade_set = rx_valid_usdt_trade_set.clone();
|
|
let mut rx3_valid_usdt_trade_set = rx_valid_usdt_trade_set.clone();
|
|
let mut rx4_valid_usdt_trade_set = rx_valid_usdt_trade_set.clone();
|
|
let mut valid_usdt_trade_set_capacity = rx_valid_usdt_trade_set.clone();
|
|
|
|
// price per second data and channels
|
|
let mut price_map: HashMap<String, f64> = HashMap::new(); // <symbol, price>
|
|
let (tx_price_map, mut rx_price_map) = watch::channel(price_map);
|
|
let mut rx3_price_map = rx_price_map.clone();
|
|
let mut rx5_price_map = rx_price_map.clone();
|
|
let mut price_map_capacity = rx_price_map.clone();
|
|
|
|
// candle data from endpoint and channels
|
|
let mut candle_1m_map: HashMap<String, Vec<CandleData>> = HashMap::new(); // <symbol, Vec<CandleData>>
|
|
let (tx_candle_1m_map, mut rx_candle_1m_map) = watch::channel(candle_1m_map);
|
|
let mut candle_1m_map_capacity = rx_candle_1m_map.clone();
|
|
|
|
let mut candle_30m_map: HashMap<String, Vec<CandleData>> = HashMap::new();
|
|
let (tx_candle_30m_map, mut rx_candle_30m_map) = watch::channel(candle_30m_map);
|
|
let mut candle_30m_map_capacity = rx_candle_30m_map.clone();
|
|
|
|
let mut candle_1d_map: HashMap<String, Vec<CandleData>> = HashMap::new();
|
|
let (tx_candle_1d_map, mut rx_candle_1d_map) = watch::channel(candle_1d_map);
|
|
let mut candle_1d_map_capacity = rx_candle_1d_map.clone();
|
|
|
|
let mut candle_1w_map: HashMap<String, Vec<CandleData>> = HashMap::new();
|
|
let (tx_candle_1w_map, mut rx_candle_1w_map) = watch::channel(candle_1w_map);
|
|
let mut candle_1w_map_capacity = rx_candle_1w_map.clone();
|
|
|
|
let mut candle_1mon_map: HashMap<String, Vec<CandleData>> = HashMap::new();
|
|
let (tx_candle_1mon_map, mut rx_candle_1mon_map) = watch::channel(candle_1mon_map);
|
|
let mut candle_1mon_map_capacity = rx_candle_1mon_map.clone();
|
|
|
|
// real-time reflected price data and channels
|
|
let mut rt_price_1m_map: HashMap<String, Vec<RealtimePriceData>> = HashMap::new(); // <symbol, Vec<RealtimePriceData>>
|
|
let (tx_rt_price_1m_map, mut rx_rt_price_1m_map) = watch::channel(rt_price_1m_map);
|
|
let mut rx2_rt_price_1m_map = rx_rt_price_1m_map.clone();
|
|
let mut rx3_rt_price_1m_map = rx_rt_price_1m_map.clone();
|
|
let mut rx4_rt_price_1m_map = rx_rt_price_1m_map.clone();
|
|
let mut rx5_rt_price_1m_map = rx_rt_price_1m_map.clone();
|
|
let mut rt_price_1m_map_capacity = rx_rt_price_1m_map.clone();
|
|
|
|
let mut rt_price_30m_map: HashMap<String, Vec<RealtimePriceData>> = HashMap::new();
|
|
let (tx_rt_price_30m_map, mut rx_rt_price_30m_map) = watch::channel(rt_price_30m_map);
|
|
let mut rx2_rt_price_30m_map = rx_rt_price_30m_map.clone();
|
|
let mut rx3_rt_price_30m_map = rx_rt_price_30m_map.clone();
|
|
let mut rx4_rt_price_30m_map = rx_rt_price_30m_map.clone();
|
|
let mut rx5_rt_price_30m_map = rx_rt_price_30m_map.clone();
|
|
let mut rt_price_30m_map_capacity = rx_rt_price_30m_map.clone();
|
|
|
|
let mut rt_price_1d_map: HashMap<String, Vec<RealtimePriceData>> = HashMap::new();
|
|
let (tx_rt_price_1d_map, mut rx_rt_price_1d_map) = watch::channel(rt_price_1d_map);
|
|
let mut rx2_rt_price_1d_map = rx_rt_price_1d_map.clone();
|
|
let mut rx3_rt_price_1d_map = rx_rt_price_1d_map.clone();
|
|
let mut rx4_rt_price_1d_map = rx_rt_price_1d_map.clone();
|
|
let mut rt_price_1d_map_capacity = rx_rt_price_1d_map.clone();
|
|
|
|
let mut rt_price_1w_map: HashMap<String, Vec<RealtimePriceData>> = HashMap::new();
|
|
let (tx_rt_price_1w_map, mut rx_rt_price_1w_map) = watch::channel(rt_price_1w_map);
|
|
let mut rx2_rt_price_1w_map = rx_rt_price_1w_map.clone();
|
|
let mut rx3_rt_price_1w_map = rx_rt_price_1w_map.clone();
|
|
let mut rx4_rt_price_1w_map = rx_rt_price_1w_map.clone();
|
|
let mut rt_price_1w_map_capacity = rx_rt_price_1w_map.clone();
|
|
|
|
let mut rt_price_1mon_map: HashMap<String, Vec<RealtimePriceData>> = HashMap::new();
|
|
let (tx_rt_price_1mon_map, mut rx_rt_price_1mon_map) = watch::channel(rt_price_1mon_map);
|
|
let mut rx2_rt_price_1mon_map = rx_rt_price_1mon_map.clone();
|
|
let mut rx3_rt_price_1mon_map = rx_rt_price_1mon_map.clone();
|
|
let mut rx4_rt_price_1mon_map = rx_rt_price_1mon_map.clone();
|
|
let mut rt_price_1mon_map_capacity = rx_rt_price_1mon_map.clone();
|
|
|
|
// TEMA data
|
|
// let mut tema3_1m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // Vec<(symbol, Vec<(price, closetime)>)>
|
|
// let (tx_tema3_1m_data, mut rx_tema3_1m_data) = watch::channel(tema3_1m_data);
|
|
// let mut tema3_30m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
|
|
// let (tx_tema3_30m_data, mut rx_tema3_30m_data) = watch::channel(tema3_30m_data);
|
|
// let mut tema3_1d_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
|
|
// let (tx_tema3_1d_data, mut rx_tema3_1d_data) = watch::channel(tema3_1d_data);
|
|
// let mut tema3_1w_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
|
|
// let (tx_tema3_1w_data, mut rx_tema3_1w_data) = watch::channel(tema3_1w_data);
|
|
// let mut tema3_1mon_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
|
|
// let (tx_tema3_1mon_data, mut rx_tema3_1mon_data) = watch::channel(tema3_1mon_data);
|
|
|
|
// let mut tema10_1m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // Vec<(symbol, Vec<(price, closetime)>)>
|
|
// let (tx_tema10_1m_data, mut rx_tema10_1m_data) = watch::channel(tema10_1m_data);
|
|
// let mut tema10_30m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
|
|
// let (tx_tema10_30m_data, mut rx_tema10_30m_data) = watch::channel(tema10_30m_data);
|
|
// let mut tema10_1d_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
|
|
// let (tx_tema10_1d_data, mut rx_tema10_1d_data) = watch::channel(tema10_1d_data);
|
|
// let mut tema10_1w_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
|
|
// let (tx_tema10_1w_data, mut rx_tema10_1w_data) = watch::channel(tema10_1w_data);
|
|
// let mut tema10_1mon_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
|
|
// let (tx_tema10_1mon_data, mut rx_tema10_1mon_data) = watch::channel(tema10_1mon_data);
|
|
|
|
// let mut tema30_1m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // Vec<(symbol, Vec<(price, closetime)>)>
|
|
// let (tx_tema30_1m_data, mut rx_tema30_1m_data) = watch::channel(tema30_1m_data);
|
|
// let mut tema30_30m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
|
|
// let (tx_tema30_30m_data, mut rx_tema30_30m_data) = watch::channel(tema30_30m_data);
|
|
// let mut tema30_1d_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
|
|
// let (tx_tema30_1d_data, mut rx_tema30_1d_data) = watch::channel(tema30_1d_data);
|
|
// let mut tema30_1w_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
|
|
// let (tx_tema30_1w_data, mut rx_tema30_1w_data) = watch::channel(tema30_1w_data);
|
|
// let mut tema30_1mon_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
|
|
// let (tx_tema30_1mon_data, mut rx_tema30_1mon_data) = watch::channel(tema30_1mon_data);
|
|
|
|
// Exchange Information data
|
|
let mut exchange_info_map: HashMap<String, ExchangeInfo> = HashMap::new();
|
|
let (tx_exchange_info_map, mut rx_exchange_info_map) = watch::channel(exchange_info_map);
|
|
let mut rx2_exchange_info_map = rx_exchange_info_map.clone();
|
|
let mut rx3_exchange_info_map = rx_exchange_info_map.clone();
|
|
let mut rx4_exchange_info_map = rx_exchange_info_map.clone();
|
|
let mut rx5_exchange_info_map = rx_exchange_info_map.clone();
|
|
let mut rx6_exchange_info_map = rx_exchange_info_map.clone();
|
|
let mut exchange_info_map_capacity = rx_exchange_info_map.clone();
|
|
|
|
{
|
|
// pre-fetching candle data: 30m, 1d, 1w and 1mon
|
|
print!("pre-fetching candle data: 30m, 1d, 1w and 1mon..");
|
|
std::io::stdout().flush();
|
|
let instant = Instant::now();
|
|
let interval_30m = String::from("30m");
|
|
let interval_1d = String::from("1d");
|
|
let interval_1w = String::from("1w");
|
|
let interval_1mon = String::from("1mon");
|
|
|
|
let mut candle_30m_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
|
|
let mut candle_1d_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
|
|
let mut candle_1w_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
|
|
let mut candle_1mon_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
|
|
|
|
request_candles::fetch_candle_parallel(&interval_30m, &mut candle_30m_map_temp).await;
|
|
request_candles::fetch_candle_parallel(&interval_1d, &mut candle_1d_map_temp).await;
|
|
request_candles::fetch_candle_parallel(&interval_1w, &mut candle_1w_map_temp).await;
|
|
request_candles::fetch_candle_parallel(&interval_1mon, &mut candle_1mon_map_temp).await;
|
|
|
|
tx_candle_30m_map.send_modify(|map| *map = candle_30m_map_temp);
|
|
tx_candle_1d_map.send_modify(|map| *map = candle_1d_map_temp);
|
|
tx_candle_1w_map.send_modify(|map| *map = candle_1w_map_temp);
|
|
tx_candle_1mon_map.send_modify(|map| *map = candle_1mon_map_temp);
|
|
|
|
// sleep as much as the loop recurs per 60 seconds, expecting child threads will have finished within 60 seconds.
|
|
println!("Ok..");
|
|
|
|
unsafe {
|
|
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
|
|
let mut elapsed_time = instant.elapsed().as_secs();
|
|
let mut remaining_time: u64 = 60 - elapsed_time;
|
|
|
|
while remaining_time > 0 && 60 > remaining_time {
|
|
print!("\rstart tradingbot in {:2} seconds", remaining_time);
|
|
io::stdout().flush();
|
|
elapsed_time = instant.elapsed().as_secs();
|
|
loop {
|
|
let temp_elapsed_time = instant.elapsed().as_secs();
|
|
let is_overflowed = remaining_time.overflowing_sub(1).1;
|
|
|
|
if is_overflowed == true {
|
|
break;
|
|
}
|
|
if elapsed_time < temp_elapsed_time {
|
|
remaining_time -= 1;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
println!("\nTradingbot is running");
|
|
}
|
|
}
|
|
}
|
|
|
|
// Task#0: monitoring tasks
|
|
tokio::task::spawn(async move {
|
|
loop {
|
|
let result = rx_task0.recv().await;
|
|
|
|
match result {
|
|
Some(_) => {
|
|
print!("\r{:2}", result.unwrap());
|
|
io::stdout().flush();
|
|
}
|
|
None => {}
|
|
}
|
|
sleep(Duration::from_millis(50)).await;
|
|
}
|
|
});
|
|
|
|
// Task#1: check server time and health
|
|
tokio::task::spawn(async move {
|
|
let mut prev_server_epoch: u128 = 0;
|
|
let mut epoch_difference: f64 = 0.0;
|
|
let mut epoch_mean: f64 = 0.0;
|
|
let mut elapsed_time: u128 = 0;
|
|
|
|
loop {
|
|
let instant = Instant::now();
|
|
let client = ClientBuilder::new()
|
|
.timeout(Duration::from_millis(1000))
|
|
.build()
|
|
.unwrap();
|
|
let mut usertime = UserTime::new();
|
|
let mut serverhealth = ServerHealth::new();
|
|
let mut epoch_difference_vec: Vec<f64> = Vec::new();
|
|
// let mut usertime_mutex = usertime_arc.lock().await;
|
|
// server_health_check_team::execute_server_health_check(&mut usertime_mutex).await;
|
|
// time_checking_team::execute_time_check(&mut usertime_mutex).await;
|
|
let result = server_health_check_team::execute_server_health_check(
|
|
&mut usertime,
|
|
&mut serverhealth,
|
|
&client,
|
|
)
|
|
.await;
|
|
while let Err(E) = time_checking_team::execute_time_check(&mut usertime, &client).await
|
|
{
|
|
sleep(Duration::from_millis((100))).await;
|
|
while let Err(e) = server_health_check_team::execute_server_health_check(
|
|
&mut usertime,
|
|
&mut serverhealth,
|
|
&client,
|
|
)
|
|
.await
|
|
{}
|
|
}
|
|
|
|
match result {
|
|
Ok(T) => {
|
|
local_epoch_tx1
|
|
.send(usertime.local_epoch)
|
|
.expect("local_epoch_tx1-local_epoch_rx1 channel has been closed.");
|
|
epoch_difference_tx1.send(usertime.epoch_difference).expect(
|
|
"epoch_difference_tx1-epoch_difference_rx1 channel has been closed.",
|
|
);
|
|
local_epoch_tx2
|
|
.send(usertime.local_epoch)
|
|
.expect("local_epoch_tx2-local_epoch_rx2 channel has been closed.");
|
|
epoch_difference_tx2.send(usertime.epoch_difference).expect(
|
|
"epoch_difference_tx2-epoch_difference_rx2 channel has been closed.",
|
|
);
|
|
tx_task1.send(1).expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
|
|
// considering RTT and to make the wait time 1 second, latency each second for every 30 seconds is averaged and subtracted with wait time.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 1_000 > elapsed_time {
|
|
if prev_server_epoch != 0 {
|
|
epoch_mean = 0.0;
|
|
|
|
epoch_difference =
|
|
(usertime.server_epoch as f64) - (prev_server_epoch as f64) - 1000.0; // milli second
|
|
|
|
if epoch_difference > 300.0 || epoch_difference < -300.0 {
|
|
epoch_difference_vec.clear();
|
|
epoch_difference = 0.0;
|
|
}
|
|
|
|
if epoch_difference_vec.len() == 10 {
|
|
epoch_difference_vec.rotate_left(1);
|
|
epoch_difference_vec.pop();
|
|
epoch_difference_vec.push(epoch_difference);
|
|
} else if epoch_difference_vec.len() < 10 {
|
|
epoch_difference_vec.push(epoch_difference);
|
|
}
|
|
for element in &epoch_difference_vec {
|
|
epoch_mean += *element;
|
|
}
|
|
epoch_mean = epoch_mean / (epoch_difference_vec.len() as f64);
|
|
}
|
|
if epoch_mean > 0.0 {
|
|
if elapsed_time > ((epoch_mean) as u128)
|
|
&& 1_000 > elapsed_time + ((epoch_mean) as u128)
|
|
{
|
|
sleep(Duration::from_millis(
|
|
(1_000 - elapsed_time - ((epoch_mean) as u128)) as u64,
|
|
))
|
|
.await;
|
|
} else {
|
|
sleep(Duration::from_millis((1_000 - elapsed_time) as u64)).await;
|
|
}
|
|
} else {
|
|
sleep(Duration::from_millis(
|
|
(1_000 - elapsed_time + ((epoch_mean.abs()) as u128)) as u64,
|
|
))
|
|
.await;
|
|
}
|
|
}
|
|
prev_server_epoch = usertime.server_epoch;
|
|
}
|
|
});
|
|
|
|
// Task#2: request trade fee
|
|
tokio::task::spawn(async move {
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let client = ClientBuilder::new()
|
|
.timeout(tokio::time::Duration::from_millis(3000))
|
|
.build()
|
|
.unwrap();
|
|
let tx1_changed = local_epoch_rx1.changed().await;
|
|
let tx2_changed = epoch_difference_rx1.changed().await;
|
|
let local_epoch = *local_epoch_rx1.borrow();
|
|
let difference_epoch = *epoch_difference_rx1.borrow();
|
|
match tx1_changed {
|
|
Ok(T) => match tx2_changed {
|
|
Ok(T) => {
|
|
let mut tradefee_vec_temp: HashMap<String, TradeFee> = HashMap::new();
|
|
loop {
|
|
let result = request_others::request_trade_fee(
|
|
API_KEY,
|
|
SECRET_KEY,
|
|
local_epoch,
|
|
difference_epoch,
|
|
&client,
|
|
&mut tradefee_vec_temp,
|
|
)
|
|
.await;
|
|
|
|
if tradefee_vec_temp.len() == 0 {
|
|
sleep(Duration::from_secs(3)).await;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
tx_tradefee_map.send_modify(|vec| *vec = tradefee_vec_temp);
|
|
tx_task2.send(2).expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {
|
|
panic!("tx2-rx2 channel has been closed.")
|
|
}
|
|
},
|
|
Err(E) => {
|
|
panic!("tx1-rx1 channel has been closed.")
|
|
}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 60 seconds if all operation finished within 60 seconds.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 60_000 > elapsed_time {
|
|
sleep(Duration::from_millis((60_000 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Task#3: request lot stepsize and ticksize
|
|
tokio::task::spawn(async move {
|
|
loop {
|
|
let client = ClientBuilder::new()
|
|
.timeout(tokio::time::Duration::from_millis(3000))
|
|
.build()
|
|
.unwrap();
|
|
let mut exchange_info_map_temp: HashMap<String, ExchangeInfo> = HashMap::new();
|
|
let mut result;
|
|
loop {
|
|
result = coin_health_check_team::request_others::request_exchange_infomation(
|
|
&client,
|
|
&mut exchange_info_map_temp,
|
|
)
|
|
.await;
|
|
|
|
// retry
|
|
if exchange_info_map_temp.len() == 0 {
|
|
sleep(Duration::from_secs(3)).await;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
tx_exchange_info_map.send_modify(|vec| *vec = exchange_info_map_temp);
|
|
tx_task3.send(3).expect("The mpsc channel has been closed.");
|
|
|
|
sleep(Duration::from_secs(300)).await; // sleep for 5 mins
|
|
}
|
|
});
|
|
|
|
// Task#4: request 24h price changes,
|
|
// pick valid USDT Trades,
|
|
// filtering stop USDT Trades,
|
|
// monitor total_24h_change_profit_index,
|
|
// usdt_24h_change_profit_index,
|
|
// total_price_down_dist_index
|
|
tokio::task::spawn(async move {
|
|
sleep(Duration::from_secs(10)).await;
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let client = ClientBuilder::new()
|
|
.timeout(tokio::time::Duration::from_millis(3000))
|
|
.build()
|
|
.unwrap();
|
|
let result = request_others::request_24hr_ticker_price_change_statistics(&client).await;
|
|
match result {
|
|
Ok(T) => {
|
|
let exchange_info_map = rx5_exchange_info_map.borrow().clone();
|
|
let mut valid_usdt_trade_set_temp: HashSet<String> = HashSet::new();
|
|
monitors::collect_valid_usde_trade(
|
|
&mut valid_usdt_trade_set_temp,
|
|
&exchange_info_map,
|
|
)
|
|
.await;
|
|
|
|
tx_valid_usdt_trade_set.send_modify(|vec| *vec = valid_usdt_trade_set_temp);
|
|
tx_task4.send(4).expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {
|
|
log::warn!(">>> Failed to monitor usdt_24h_change_profit_index.");
|
|
}
|
|
}
|
|
// match result {
|
|
// Ok(T) => {
|
|
// let result = monitors::total_24h_change_profit_index().await;
|
|
// match result {
|
|
// Ok(T) => {},
|
|
// Err(E) => {
|
|
// log::warn!(">>> Failed to monitor total_24h_change_profit_index.");
|
|
// }
|
|
// };
|
|
|
|
// let result = monitors::usdt_24h_change_profit_index().await;
|
|
// match result {
|
|
// Ok(T) => {},
|
|
// Err(E) => {
|
|
// log::warn!(">>> Failed to monitor usdt_24h_change_profit_index.");
|
|
// }
|
|
// };
|
|
|
|
// let result = monitors::total_price_down_dist_index().await;
|
|
// match result {
|
|
// Ok(T) => {},
|
|
// Err(E) => {
|
|
// log::warn!(">>> Failed to monitor total_price_down_dist_index.");
|
|
// }
|
|
// };
|
|
// },
|
|
// Err(E) => {
|
|
// log::warn!(">>> Failed to fetch 24h price change data from endpoint or parse message.");
|
|
// }
|
|
// };
|
|
|
|
// sleep as much as the loop recurs per 30 seconds if all operation finished within 30 seconds.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 30_000 > elapsed_time {
|
|
sleep(Duration::from_millis((30_000 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Task#5: price per second
|
|
tokio::task::spawn(async move {
|
|
sleep(Duration::from_secs(20)).await;
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let client = ClientBuilder::new()
|
|
.timeout(tokio::time::Duration::from_millis(1000))
|
|
.build()
|
|
.unwrap();
|
|
let mut price_vec_temp: HashMap<String, f64> = HashMap::new();
|
|
let mut price_vec_temp_c: HashMap<String, f64> = HashMap::new();
|
|
request_others::request_all_coin_price(&client, &mut price_vec_temp).await;
|
|
price_vec_temp_c = price_vec_temp.clone();
|
|
tx_price_map.send_modify(|vec| *vec = price_vec_temp);
|
|
tx_task5.send(5).expect("The mpsc channel has been closed.");
|
|
|
|
// Task#10: make realtime price data of 1m, 30m, 1d, 1w and 1mon
|
|
let valid_usdt_trade_set = rx_valid_usdt_trade_set.borrow().clone();
|
|
// 1m
|
|
let interval = String::from("1m");
|
|
let candle_1m_vec = rx_candle_1m_map.borrow().clone();
|
|
let dummy_data: HashMap<String, Vec<RealtimePriceData>> = HashMap::new();
|
|
let mut rt_price_1m_map_write_temp: HashMap<String, Vec<RealtimePriceData>> =
|
|
HashMap::new();
|
|
let mut rt_price_1m_map_write_temp_c: HashMap<String, Vec<RealtimePriceData>> =
|
|
HashMap::new();
|
|
let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(
|
|
&interval,
|
|
&candle_1m_vec,
|
|
&dummy_data,
|
|
&mut rt_price_1m_map_write_temp,
|
|
&price_vec_temp_c,
|
|
&valid_usdt_trade_set,
|
|
)
|
|
.await;
|
|
rt_price_1m_map_write_temp_c = rt_price_1m_map_write_temp.clone();
|
|
if tx_rt_price_1m_map.is_closed() {
|
|
log::error!("tx_rt_price_1m_vec has been closed!");
|
|
} else {
|
|
tx_rt_price_1m_map.send_modify(|vec| *vec = rt_price_1m_map_write_temp);
|
|
}
|
|
|
|
// 30m
|
|
let interval = String::from("30m");
|
|
let candle_30m_map = rx_candle_30m_map.borrow().clone();
|
|
let mut rt_price_30m_map_write_temp: HashMap<String, Vec<RealtimePriceData>> =
|
|
HashMap::new();
|
|
let mut rt_price_30m_map_write_temp_c: HashMap<String, Vec<RealtimePriceData>> =
|
|
HashMap::new();
|
|
if !rt_price_1m_map_write_temp_c.is_empty() {
|
|
let result =
|
|
value_estimation_team::datapoints::price_data::update_realtime_price_data(
|
|
&interval,
|
|
&candle_30m_map,
|
|
&rt_price_1m_map_write_temp_c,
|
|
&mut rt_price_30m_map_write_temp,
|
|
&price_vec_temp_c,
|
|
&valid_usdt_trade_set,
|
|
)
|
|
.await;
|
|
rt_price_30m_map_write_temp_c = rt_price_30m_map_write_temp.clone();
|
|
if tx_rt_price_30m_map.is_closed() {
|
|
log::error!("tx_rt_price_30m_vec has been closed!");
|
|
} else {
|
|
tx_rt_price_30m_map.send_modify(
|
|
|map: &mut HashMap<String, Vec<RealtimePriceData>>| {
|
|
*map = rt_price_30m_map_write_temp
|
|
},
|
|
);
|
|
}
|
|
}
|
|
|
|
// 1d
|
|
let interval = String::from("1d");
|
|
let candle_1d_vec = rx_candle_1d_map.borrow().clone();
|
|
let mut rt_price_1d_map_write_temp: HashMap<String, Vec<RealtimePriceData>> =
|
|
HashMap::new();
|
|
|
|
if !rt_price_30m_map_write_temp_c.is_empty() {
|
|
let result =
|
|
value_estimation_team::datapoints::price_data::update_realtime_price_data(
|
|
&interval,
|
|
&candle_1d_vec,
|
|
&rt_price_30m_map_write_temp_c,
|
|
&mut rt_price_1d_map_write_temp,
|
|
&price_vec_temp_c,
|
|
&valid_usdt_trade_set,
|
|
)
|
|
.await;
|
|
|
|
match result {
|
|
Ok(T) => {
|
|
if tx_rt_price_1d_map.is_closed() {
|
|
log::error!("tx_rt_price_1d_vec has been closed!");
|
|
} else {
|
|
tx_rt_price_1d_map.send_modify(|map| *map = rt_price_1d_map_write_temp);
|
|
}
|
|
}
|
|
Err(E) => {}
|
|
}
|
|
}
|
|
|
|
// { // 1w
|
|
// let interval = String::from("1w");
|
|
// let candle_1w_vec = rx_candle_1w_vec.borrow().clone();
|
|
// let rt_price_1d_vec = rx_rt_price_1d_vec.borrow().clone();
|
|
// let mut rt_price_1w_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
|
|
// let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1w_vec, &rt_price_1d_vec, &mut rt_price_1w_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await;
|
|
|
|
// match result {
|
|
// Ok(T) => {
|
|
// tx_rt_price_1w_vec.send_modify(|vec| *vec = rt_price_1w_vec_write_temp);
|
|
// tx_task10.send(10).expect("The mpsc channel has been closed.");
|
|
// }
|
|
// Err(E) => {}
|
|
// }
|
|
// }
|
|
|
|
// { // 1mon
|
|
// let interval = String::from("1mon");
|
|
// let candle_1mon_vec = rx_candle_1mon_vec.borrow().clone();
|
|
// let rt_price_1w_vec = rx_rt_price_1w_vec.borrow().clone();
|
|
// let mut rt_price_1mon_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
|
|
// let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1mon_vec, &rt_price_1w_vec, &mut rt_price_1mon_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await;
|
|
|
|
// match result {
|
|
// Ok(T) => {
|
|
// tx_rt_price_1mon_vec.send_modify(|vec| *vec = rt_price_1mon_vec_write_temp);
|
|
// tx_task10.send(10).expect("The mpsc channel has been closed.");
|
|
// }
|
|
// Err(E) => {}
|
|
// }
|
|
// }
|
|
|
|
// sleep as much as the loop recurs per 1 seconds if all operation finished within 1 seconds.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 500 > elapsed_time {
|
|
sleep(Duration::from_millis((500 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Task#6: fetch candle 1m
|
|
tokio::task::spawn(async move {
|
|
let mut elapsed_time = 0;
|
|
let interval = String::from("1m");
|
|
loop {
|
|
let instant = Instant::now();
|
|
let mut candle_1m_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
|
|
let result =
|
|
request_candles::fetch_candle_parallel(&interval, &mut candle_1m_map_temp).await;
|
|
|
|
match result {
|
|
Ok(T) => {
|
|
tx_candle_1m_map.send_modify(|vec| *vec = candle_1m_map_temp);
|
|
tx_task6.send(6).expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 10 seconds, expecting child threads will have finished within 10 seconds.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 30_000 > elapsed_time {
|
|
// 10_000 for major trade
|
|
sleep(Duration::from_millis((30_000 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Task#7: fetch candle 30m
|
|
tokio::task::spawn(async move {
|
|
let mut elapsed_time = 0;
|
|
let interval = String::from("30m");
|
|
loop {
|
|
let instant = Instant::now();
|
|
let mut candle_30m_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
|
|
let result =
|
|
request_candles::fetch_candle_delay(&interval, &mut candle_30m_map_temp).await;
|
|
// request_candles::fetch_candle_parallel(&interval, &mut candle_30m_vec_temp).await;
|
|
|
|
match result {
|
|
Ok(T) => {
|
|
tx_candle_30m_map.send_modify(|vec| *vec = candle_30m_map_temp);
|
|
tx_task7.send(7).expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {}
|
|
}
|
|
// sleep as much as the loop recurs per 60 seconds, expecting child threads will have finished within 60 seconds.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 60_000 > elapsed_time {
|
|
//60_000
|
|
sleep(Duration::from_millis((60_000 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Task#8: fetch candle 1d
|
|
tokio::task::spawn(async move {
|
|
let mut elapsed_time = 0;
|
|
let interval = String::from("1d");
|
|
sleep(Duration::from_secs(600)).await;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let mut candle_1d_map_temp: HashMap<String, Vec<CandleData>> = HashMap::new();
|
|
let result =
|
|
request_candles::fetch_candle_delay(&interval, &mut candle_1d_map_temp).await;
|
|
|
|
match result {
|
|
Ok(T) => {
|
|
tx_candle_1d_map.send_modify(|vec| *vec = candle_1d_map_temp);
|
|
tx_task8.send(8).expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {}
|
|
}
|
|
// sleep as much as the loop recurs per 1800 seconds, expecting child threads will have finished within 1800 seconds.
|
|
elapsed_time = instant.elapsed().as_secs();
|
|
if 1_800 > elapsed_time {
|
|
sleep(Duration::from_secs((1_800 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// // Task#9: fetch candle 1w
|
|
// tokio::task::spawn(async move{
|
|
// let interval = String::from("1w");
|
|
// sleep(Duration::from_secs(600)).await;
|
|
// loop{
|
|
// let mut candle_1w_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
|
|
// let result = request_candles::fetch_candle_delay(&interval, &mut candle_1w_vec_temp).await;
|
|
|
|
// match result {
|
|
// Ok(T) => {
|
|
// tx_candle_1w_vec.send_modify(|vec| *vec = candle_1w_vec_temp);
|
|
// tx_task9.send(9).expect("The mpsc channel has been closed.");
|
|
// }
|
|
// Err(E) => {}
|
|
// }
|
|
// }
|
|
// });
|
|
|
|
// // Task#10: fetch candle 1mon
|
|
// tokio::task::spawn(async move{
|
|
// let interval = String::from("1mon");
|
|
// sleep(Duration::from_secs(600)).await;
|
|
// loop{
|
|
// let mut candle_1mon_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
|
|
// let result = request_candles::fetch_candle_delay(&interval, &mut candle_1mon_vec_temp).await;
|
|
|
|
// match result {
|
|
// Ok(T) => {
|
|
// tx_candle_1mon_vec.send_modify(|vec| *vec = candle_1mon_vec_temp);
|
|
// tx_task10.send(10).expect("The mpsc channel has been closed.");
|
|
// }
|
|
// Err(E) => {}
|
|
// }
|
|
// }
|
|
// });
|
|
|
|
// Task#11: monitoring total market cap
|
|
// if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST {
|
|
// tokio::task::spawn(async move {
|
|
// loop {
|
|
// let result = signal_association::coinmarketcap::market_cap_index().await;
|
|
// match result {
|
|
// Ok(T) => {
|
|
// tx_task11
|
|
// .send(11)
|
|
// .expect("The mpsc channel has been closed.");
|
|
// }
|
|
// Err(E) => {}
|
|
// }
|
|
// }
|
|
// });
|
|
// }
|
|
|
|
// Task#12: monitoring foreign exchange rate
|
|
// if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
|
|
// tokio::task::spawn(async move {
|
|
// loop {
|
|
// let result = signal_association::exchange_rate::monitoring_fx_rate_index().await;
|
|
// match result {
|
|
// Ok(T) => {
|
|
// tx_task12
|
|
// .send(12)
|
|
// .expect("The mpsc channel has been closed.");
|
|
// }
|
|
// Err(E) => {}
|
|
// }
|
|
// sleep(Duration::from_secs(2700)).await; // sleep for 45mins
|
|
// }
|
|
// });
|
|
// }
|
|
|
|
// // // Task#13: monitoring dollar index
|
|
// // tokio::task::spawn(async move {
|
|
// // loop
|
|
// // {
|
|
// // let result = signal_association::dollar_index::monitoring_dollar_index().await;
|
|
// // match result {
|
|
// // Ok(T) => {
|
|
// // tx_task13.send(13).expect("The mpsc channel has been closed.");
|
|
// // }
|
|
// // Err(E) => {}
|
|
// // }
|
|
// // sleep(Duration::from_secs(1800)).await; // sleep for 30mins
|
|
// // }
|
|
// // });
|
|
|
|
// Task#14: monitoring signal decision
|
|
// if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST {
|
|
// tokio::task::spawn(async move {
|
|
// loop {
|
|
// let result =
|
|
// signal_association::signal_decision::monitoring_signal_decision().await;
|
|
// match result {
|
|
// Ok(T) => {
|
|
// tx_task14
|
|
// .send(14)
|
|
// .expect("The mpsc channel has been closed.");
|
|
// }
|
|
// Err(E) => {}
|
|
// }
|
|
|
|
// sleep(Duration::from_millis(500)).await; // sleep for 0.5sec
|
|
// }
|
|
// });
|
|
// }
|
|
|
|
// Task#15: monitoring future ratio
|
|
// if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST {
|
|
// tokio::task::spawn(async move {
|
|
// loop {
|
|
// let result = signal_association::future_ratio::monitoring_future_ratio().await;
|
|
// match result {
|
|
// Ok(T) => {
|
|
// tx_task15
|
|
// .send(15)
|
|
// .expect("The mpsc channel has been closed.");
|
|
// }
|
|
// Err(E) => {}
|
|
// }
|
|
|
|
// sleep(Duration::from_secs(60)).await; // sleep for 1min
|
|
// }
|
|
// });
|
|
// }
|
|
|
|
// COEX part
|
|
// Task#16: execute strategis for buy
|
|
unsafe {
|
|
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
|
|
tokio::task::spawn(async move {
|
|
sleep(Duration::from_secs(40)).await;
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let mut all_data = AllData::new();
|
|
all_data.valid_symbol_vec = rx3_valid_usdt_trade_set.borrow().clone();
|
|
|
|
// realtime price data
|
|
all_data.rt_price_1m_vec = rx3_rt_price_1m_map.borrow().clone();
|
|
all_data.rt_price_30m_vec = rx3_rt_price_30m_map.borrow().clone();
|
|
all_data.rt_price_1d_vec = rx3_rt_price_1d_map.borrow().clone();
|
|
all_data.rt_price_1w_vec = rx3_rt_price_1w_map.borrow().clone();
|
|
all_data.rt_price_1mon_vec = rx3_rt_price_1mon_map.borrow().clone();
|
|
|
|
let result =
|
|
strategy_team::strategy_manager::execute_list_up_for_buy(&all_data).await;
|
|
|
|
match result {
|
|
Ok(T) => {
|
|
tx_task16
|
|
.send(16)
|
|
.expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {
|
|
// log::error!("Couldn't execute strategists.");
|
|
}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 250 > elapsed_time {
|
|
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
} else {
|
|
tokio::task::spawn(async move {
|
|
sleep(Duration::from_secs(10)).await;
|
|
let mut elapsed_time = 0;
|
|
let mut all_data = AllData::new();
|
|
loop {
|
|
let instant = Instant::now();
|
|
all_data.valid_symbol_vec = rx3_valid_usdt_trade_set.borrow().clone();
|
|
// realtime price data
|
|
all_data.rt_price_1m_vec = rx3_rt_price_1m_map.borrow().clone();
|
|
all_data.rt_price_30m_vec = rx3_rt_price_30m_map.borrow().clone();
|
|
all_data.rt_price_1d_vec = rx3_rt_price_1d_map.borrow().clone();
|
|
all_data.rt_price_1w_vec = rx3_rt_price_1w_map.borrow().clone();
|
|
all_data.rt_price_1mon_vec = rx3_rt_price_1mon_map.borrow().clone();
|
|
|
|
// let result = coex::strategy_team::execute_strategist_for_test(&all_data).await;
|
|
|
|
// match result {
|
|
// Ok(T) => {
|
|
// tx_task16
|
|
// .send(16)
|
|
// .expect("The mpsc channel has been closed.");
|
|
// }
|
|
// Err(E) => {}
|
|
// }
|
|
|
|
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 250 > elapsed_time {
|
|
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
// Task#17: execute strategis for sell
|
|
unsafe {
|
|
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
|
|
tokio::task::spawn(async move {
|
|
sleep(Duration::from_secs(40)).await;
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let mut all_data = AllData::new();
|
|
let mut exchange_info_map: HashMap<String, ExchangeInfo> = HashMap::new();
|
|
let mut trade_fee_map: HashMap<String, TradeFee> = HashMap::new();
|
|
|
|
all_data.valid_symbol_vec = rx4_valid_usdt_trade_set.borrow().clone();
|
|
exchange_info_map = rx6_exchange_info_map.borrow().clone();
|
|
trade_fee_map = rx5_tradefee_map.borrow().clone();
|
|
// realtime price data
|
|
all_data.rt_price_1m_vec = rx5_rt_price_1m_map.borrow().clone();
|
|
all_data.rt_price_30m_vec = rx5_rt_price_30m_map.borrow().clone();
|
|
all_data.rt_price_1d_vec = rx4_rt_price_1d_map.borrow().clone();
|
|
all_data.rt_price_1w_vec = rx4_rt_price_1w_map.borrow().clone();
|
|
all_data.rt_price_1mon_vec = rx4_rt_price_1mon_map.borrow().clone();
|
|
|
|
let result = strategy_team::strategy_manager::execute_list_up_for_sell(
|
|
&all_data,
|
|
&exchange_info_map,
|
|
&trade_fee_map,
|
|
)
|
|
.await;
|
|
|
|
match result {
|
|
Ok(T) => {
|
|
tx_task17
|
|
.send(17)
|
|
.expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {
|
|
// log::error!("Couldn't execute strategists.");
|
|
}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 250 > elapsed_time {
|
|
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
// Task#18: monitoring pre-suggested coins
|
|
tokio::task::spawn(async move {
|
|
sleep(Duration::from_secs(30)).await;
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let coin_price_vec = rx3_price_map.borrow().clone();
|
|
let result = coex::exchange_team::monitoring_pre_suggested_coins(&coin_price_vec).await;
|
|
|
|
// send Task#0 a message to notify running on
|
|
match result {
|
|
Ok(T) => {
|
|
tx_task18
|
|
.send(18)
|
|
.expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 100 > elapsed_time {
|
|
sleep(Duration::from_millis((100 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Task#19: buy_coin
|
|
tokio::task::spawn(async move {
|
|
sleep(Duration::from_secs(30)).await;
|
|
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let exchange_info_map = rx_exchange_info_map.borrow().clone();
|
|
let trade_fee_map = rx_tradefee_map.borrow().clone();
|
|
let result = coex::exchange_team::buy_coin(&exchange_info_map, &trade_fee_map).await;
|
|
|
|
// send Task#0 a message to notify running on
|
|
match result {
|
|
Ok(T) => {
|
|
tx_task19
|
|
.send(19)
|
|
.expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {
|
|
eprint!("Error: {:?}", E);
|
|
}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 200ms second if all operation finished within 200ms
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 200 > elapsed_time {
|
|
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Task#20: monitoring_open_buy_order
|
|
unsafe {
|
|
if RUNNING_MODE == REAL || RUNNING_MODE == TEST {
|
|
tokio::task::spawn(async move {
|
|
sleep(Duration::from_secs(30)).await;
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let client = ClientBuilder::new()
|
|
.timeout(tokio::time::Duration::from_millis(3000))
|
|
.build()
|
|
.unwrap();
|
|
let trade_fee_map = rx2_tradefee_map.borrow().clone();
|
|
let result =
|
|
coex::order_team::monitoring_open_buy_order(&client, &trade_fee_map).await;
|
|
|
|
// send Task#0 a message to notify running on
|
|
match result {
|
|
Ok(T) => {
|
|
tx_task20
|
|
.send(20)
|
|
.expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 200 > elapsed_time {
|
|
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
// Task#21: update_price_of_filled_buy_order
|
|
tokio::task::spawn(async move {
|
|
sleep(Duration::from_secs(30)).await;
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let coin_price_vec = rx5_price_map.borrow().clone();
|
|
let exchange_info_vec = rx2_exchange_info_map.borrow().clone();
|
|
let trade_fee_vec = rx3_tradefee_map.borrow().clone();
|
|
|
|
let result = coex::order_team::update_price_of_filled_buy_order(
|
|
&coin_price_vec,
|
|
&exchange_info_vec,
|
|
&trade_fee_vec,
|
|
)
|
|
.await;
|
|
|
|
// send Task#0 a message to notify running on
|
|
match result {
|
|
Ok(T) => {
|
|
tx_task21
|
|
.send(21)
|
|
.expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
|
|
if 250 > elapsed_time {
|
|
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Task#22: monitoring_open_sell_order
|
|
unsafe {
|
|
if RUNNING_MODE == REAL || RUNNING_MODE == TEST {
|
|
tokio::task::spawn(async move {
|
|
sleep(Duration::from_secs(30)).await;
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let client = ClientBuilder::new()
|
|
.timeout(tokio::time::Duration::from_millis(3000))
|
|
.build()
|
|
.unwrap();
|
|
let exchange_info_vec = rx4_exchange_info_map.borrow().clone();
|
|
let trade_fee_vec = rx4_tradefee_map.borrow().clone();
|
|
let result = coex::order_team::monitoring_open_sell_order(
|
|
&client,
|
|
&exchange_info_vec,
|
|
&trade_fee_vec,
|
|
)
|
|
.await;
|
|
|
|
// send Task#0 a message to notify running on
|
|
match result {
|
|
Ok(T) => {
|
|
tx_task22
|
|
.send(22)
|
|
.expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 250 > elapsed_time {
|
|
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
// Task#23: monitoring_filled_sell_order
|
|
tokio::task::spawn(async move {
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let client = ClientBuilder::new()
|
|
.timeout(tokio::time::Duration::from_millis(3000))
|
|
.build()
|
|
.unwrap();
|
|
let result = coex::order_team::monitoring_filled_sell_order(&client).await;
|
|
|
|
// send Task#0 a message to notify running on
|
|
match result {
|
|
Ok(T) => {
|
|
tx_task23
|
|
.send(23)
|
|
.expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 200 > elapsed_time {
|
|
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Task#24: monitoring_scoreboard
|
|
tokio::task::spawn(async move {
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let mut all_data = AllData::new();
|
|
|
|
// realtime price data
|
|
all_data.rt_price_1m_vec = rx4_rt_price_1m_map.borrow().clone();
|
|
|
|
let result = coex::exchange_team::monitoring_scoreboard(&all_data).await;
|
|
|
|
// send Task#0 a message to notify running on
|
|
match result {
|
|
Ok(T) => {
|
|
tx_task24
|
|
.send(24)
|
|
.expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 200 > elapsed_time {
|
|
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Task#25: update current_total_usdt and available_usdt
|
|
tokio::task::spawn(async move {
|
|
let mut previous_result = false;
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let client = ClientBuilder::new()
|
|
.timeout(tokio::time::Duration::from_millis(3000))
|
|
.build()
|
|
.unwrap();
|
|
let result =
|
|
coex::assets_managing_team::monitoring_asset_usdt(&mut previous_result, &client)
|
|
.await;
|
|
|
|
// send Task#0 a message to notify running on
|
|
match result {
|
|
Ok(T) => {
|
|
tx_task25
|
|
.send(25)
|
|
.expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {
|
|
log::error!("{}", E);
|
|
}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 300ms if all operation finished within 300ms.
|
|
elapsed_time = instant.elapsed().as_millis();
|
|
if 250 > elapsed_time {
|
|
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Task#26: update kelly_criterion
|
|
tokio::task::spawn(async move {
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let result = coex::assets_managing_team::update_kelly_criterion().await;
|
|
|
|
// send Task#0 a message to notify running on
|
|
match result {
|
|
Ok(T) => {
|
|
tx_task26
|
|
.send(26)
|
|
.expect("The mpsc channel has been closed.");
|
|
}
|
|
Err(E) => {
|
|
log::error!("{}", E);
|
|
}
|
|
}
|
|
|
|
// sleep as much as the loop recurs per 1 minutes
|
|
elapsed_time = instant.elapsed().as_secs();
|
|
if 60 > elapsed_time {
|
|
sleep(Duration::from_secs((60 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Task#27: request delist symbols
|
|
tokio::task::spawn(async move {
|
|
let mut elapsed_time = 0;
|
|
loop {
|
|
let instant = Instant::now();
|
|
let client = ClientBuilder::new()
|
|
.timeout(tokio::time::Duration::from_millis(3000))
|
|
.build()
|
|
.unwrap();
|
|
let tx1_changed = local_epoch_rx2.changed().await;
|
|
let tx2_changed = epoch_difference_rx2.changed().await;
|
|
let local_epoch = *local_epoch_rx2.borrow();
|
|
let difference_epoch = *epoch_difference_rx2.borrow();
|
|
|
|
let result = request_others::request_delist_symbols(
|
|
API_KEY,
|
|
SECRET_KEY,
|
|
local_epoch,
|
|
difference_epoch,
|
|
&client,
|
|
)
|
|
.await;
|
|
|
|
tx_task27
|
|
.send(27)
|
|
.expect("The mpsc channel has been closed.");
|
|
|
|
// sleep as much as the loop recurs per 300 seconds if all operation finished within 300 seconds.
|
|
elapsed_time = instant.elapsed().as_secs();
|
|
if 300 > elapsed_time {
|
|
sleep(Duration::from_secs((300 - elapsed_time) as u64)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
loop {}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn program_setting() {
|
|
let matches = Command::new("Tradingbot")
|
|
.arg(
|
|
arg!(log_level: -l --log <level> "Select log level: trace, debug, info, warn, error")
|
|
.default_value("error"),
|
|
)
|
|
.arg(arg!(mode: -m --mode <mode> "Select mode: real, simul, test").required(true))
|
|
.get_matches();
|
|
|
|
// set log level
|
|
set_up_color_terminal();
|
|
if let Some(level) = matches.get_one::<String>("log_level") {
|
|
match level.clone().to_ascii_lowercase().as_str() {
|
|
"trace" => simple_logger::init_with_level(Level::Trace).unwrap(),
|
|
"debug" => simple_logger::init_with_level(Level::Debug).unwrap(),
|
|
"info" => simple_logger::init_with_level(Level::Info).unwrap(),
|
|
"warn" => simple_logger::init_with_level(Level::Warn).unwrap(),
|
|
"error" => simple_logger::init_with_level(Level::Error).unwrap(),
|
|
_ => {
|
|
simple_logger::init_with_level(Level::Error).unwrap();
|
|
log::error!("wrong log level argument.");
|
|
std::process::exit(0);
|
|
}
|
|
}
|
|
}
|
|
|
|
// set mode
|
|
let mut mode: String = matches.get_one::<String>("mode").unwrap().clone();
|
|
unsafe {
|
|
match mode.to_ascii_lowercase().as_str() {
|
|
"real" => {
|
|
RUNNING_MODE = RunningMode::REAL;
|
|
println!("*** REAL MODE ***");
|
|
}
|
|
"simul" => {
|
|
RUNNING_MODE = RunningMode::SIMUL;
|
|
println!("*** SIMULATION MODE ***");
|
|
}
|
|
"test" => {
|
|
RUNNING_MODE = RunningMode::TEST;
|
|
println!("*** TEST MODE ***");
|
|
}
|
|
_ => {
|
|
log::error!("wrong mode argument.");
|
|
std::process::exit(0);
|
|
}
|
|
}
|
|
}
|
|
}
|