tradingbot/src/main.rs
2023-11-19 01:58:09 +09:00

1583 lines
66 KiB
Rust

#![allow(unused)]
#![allow(warnings)]
use crate::coin_health_check_team::*;
use crate::request_candles::CandleData;
use crate::request_others::{CoinPriceData, ExchangeInfo, TradeFee};
use crate::server_health_check_team::ServerHealth;
use crate::strategy_team::AllData;
use crate::time_checking_team::UserTime;
use crate::value_estimation_team::datapoints::price_data::RealtimePriceData;
use reqwest::{Client, ClientBuilder};
use sqlx::{mysql::*, Connection, Executor, FromRow, Row};
use std::{
io::{self, Write},
sync::Arc,
};
use tokio::{fs::*, join, sync::mpsc, sync::watch, sync::Mutex, task::*, time::*};
use tradingbot::{RunningMode::*, *};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
initialization::initialization().await;
// // let c1 = vec!(("name", "Shima Nabil"));
// // println!("{:?}", database_control::update_record(&table_name, &c1, &("grade", "3")).await);
// //database_control::exists_table(&table_name).await;
// // let table_name = String::from("all24hstatistics");
// // let result = database_control::count_rows(&table_name).await;
// // println!("{:?}", result);
// // let rows = sqlx::query("select * from serverhealth")
// // .map(|r| {
// // a.id = r.get::<i32, _>("id");
// // a.waiting_maximum = r.get::<i32, _>("waiting_maximum");
// // a.server_on = r.get::<bool, _>("server_on");
// // a.ping_on = r.get::<bool, _>("ping_on");
// // a.wallet_system_on = r.get::<bool, _>("wallet_system_on");
// //
// // })
// // .collect::<Vec<String>>()
// // .fetch(&pool).await;
// // let str_result = rows
// // .iter()
// // .map(|r| format!("{} {}", r.get::<i32, _>("id"), r.get::<bool, _>("server_on")))
// // .collect::<Vec<String>>()
// // .join(", ");
// // println!("{:?}", rows);
// //
let (tx_task1, mut rx_task0) = mpsc::unbounded_channel::<u32>();
let tx_task2 = tx_task1.clone(); // for Task#2
let tx_task3 = tx_task1.clone(); // for Task#3
let tx_task4 = tx_task1.clone(); // for Task#4
let tx_task5 = tx_task1.clone(); // for Task#5
let tx_task6 = tx_task1.clone(); // for Task#6
let tx_task7 = tx_task1.clone(); // for Task#7
let tx_task8 = tx_task1.clone(); // for Task#8
let tx_task9 = tx_task1.clone(); // for Task#9
let tx_task10 = tx_task1.clone(); // for Task#10
let tx_task11 = tx_task1.clone(); // for Task#11
let tx_task12 = tx_task1.clone(); // for Task#12
let tx_task13 = tx_task1.clone(); // for Task#13
let tx_task14 = tx_task1.clone(); // for Task#14
let tx_task15 = tx_task1.clone(); // for Task#15
let tx_task16 = tx_task1.clone(); // for Task#16
let tx_task17 = tx_task1.clone(); // for Task#17
let tx_task18 = tx_task1.clone(); // for Task#18
let tx_task19 = tx_task1.clone(); // for Task#19
let tx_task20 = tx_task1.clone(); // for Task#20
let tx_task21 = tx_task1.clone(); // for Task#21
let tx_task22 = tx_task1.clone(); // for Task#22
let tx_task23 = tx_task1.clone(); // for Task#23
let tx_task24 = tx_task1.clone(); // for Task#24
let tx_task25 = tx_task1.clone(); // for Task#25
let tx_task26 = tx_task1.clone(); // for Task#26
let (tx1, mut rx1_1) = watch::channel(0); // local_epoch
let mut rx1_2 = rx1_1.clone();
let mut rx1_3 = rx1_1.clone();
let mut rx1_4 = rx1_1.clone();
let mut rx1_5 = rx1_1.clone();
let mut rx1_6 = rx1_1.clone();
let mut rx1_7 = rx1_1.clone();
let (tx2, mut rx2) = watch::channel(0); // epoch_difference
// trade fee data
let mut tradefee_vec: Vec<TradeFee> = Vec::new(); // (symbol, makerCommission, takerCommission)
let (tx_tradefee_vec, mut rx_tradefee_vec) = watch::channel(tradefee_vec);
let mut rx2_tradefee_vec = rx_tradefee_vec.clone();
let mut rx3_tradefee_vec = rx_tradefee_vec.clone();
let mut rx4_tradefee_vec = rx_tradefee_vec.clone();
let mut rx5_tradefee_vec = rx_tradefee_vec.clone();
// valid usdt trade data
let mut valid_usdt_trade_vec: Vec<String> = Vec::new(); // symbol
let (tx_valid_usdt_trade_vec, mut rx_valid_usdt_trade_vec) =
watch::channel(valid_usdt_trade_vec);
let mut rx2_valid_usdt_trade_vec = rx_valid_usdt_trade_vec.clone();
let mut rx3_valid_usdt_trade_vec = rx_valid_usdt_trade_vec.clone();
let mut rx4_valid_usdt_trade_vec = rx_valid_usdt_trade_vec.clone();
// price per second data and channels
let mut price_vec: Vec<CoinPriceData> = Vec::new(); // (symbol, price)
let (tx_price_vec, mut rx_price_vec) = watch::channel(price_vec);
let mut rx3_price_vec = rx_price_vec.clone();
let mut rx5_price_vec = rx_price_vec.clone();
// candle data from endpoint and channels
let mut candle_1m_vec: Vec<(String, Vec<CandleData>)> = Vec::new(); // (symbol, Vec<CandleData)>
let (tx_candle_1m_vec, mut rx_candle_1m_vec) = watch::channel(candle_1m_vec);
let mut candle_30m_vec: Vec<(String, Vec<CandleData>)> = Vec::new();
let (tx_candle_30m_vec, mut rx_candle_30m_vec) = watch::channel(candle_30m_vec);
let mut candle_1d_vec: Vec<(String, Vec<CandleData>)> = Vec::new();
let (tx_candle_1d_vec, mut rx_candle_1d_vec) = watch::channel(candle_1d_vec);
let mut candle_1w_vec: Vec<(String, Vec<CandleData>)> = Vec::new();
let (tx_candle_1w_vec, mut rx_candle_1w_vec) = watch::channel(candle_1w_vec);
let mut candle_1mon_vec: Vec<(String, Vec<CandleData>)> = Vec::new();
let (tx_candle_1mon_vec, mut rx_candle_1mon_vec) = watch::channel(candle_1mon_vec);
// real-time reflected price data and channels
let mut rt_price_1m_vec: Vec<(String, Vec<RealtimePriceData>)> = Vec::new(); // (symbol, Vec<RealtimePriceData)>
let (tx_rt_price_1m_vec, mut rx_rt_price_1m_vec) = watch::channel(rt_price_1m_vec);
let mut rx2_rt_price_1m_vec = rx_rt_price_1m_vec.clone();
let mut rx3_rt_price_1m_vec = rx_rt_price_1m_vec.clone();
let mut rx4_rt_price_1m_vec = rx_rt_price_1m_vec.clone();
let mut rx5_rt_price_1m_vec = rx_rt_price_1m_vec.clone();
let mut rt_price_30m_vec: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
let (tx_rt_price_30m_vec, mut rx_rt_price_30m_vec) = watch::channel(rt_price_30m_vec);
let mut rx2_rt_price_30m_vec = rx_rt_price_30m_vec.clone();
let mut rx3_rt_price_30m_vec = rx_rt_price_30m_vec.clone();
let mut rx4_rt_price_30m_vec = rx_rt_price_30m_vec.clone();
let mut rx5_rt_price_30m_vec = rx_rt_price_30m_vec.clone();
let mut rt_price_1d_vec: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
let (tx_rt_price_1d_vec, mut rx_rt_price_1d_vec) = watch::channel(rt_price_1d_vec);
let mut rx2_rt_price_1d_vec = rx_rt_price_1d_vec.clone();
let mut rx3_rt_price_1d_vec = rx_rt_price_1d_vec.clone();
let mut rx4_rt_price_1d_vec = rx_rt_price_1d_vec.clone();
let mut rt_price_1w_vec: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
let (tx_rt_price_1w_vec, mut rx_rt_price_1w_vec) = watch::channel(rt_price_1w_vec);
let mut rx2_rt_price_1w_vec = rx_rt_price_1w_vec.clone();
let mut rx3_rt_price_1w_vec = rx_rt_price_1w_vec.clone();
let mut rx4_rt_price_1w_vec = rx_rt_price_1w_vec.clone();
let mut rt_price_1mon_vec: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
let (tx_rt_price_1mon_vec, mut rx_rt_price_1mon_vec) = watch::channel(rt_price_1mon_vec);
let mut rx2_rt_price_1mon_vec = rx_rt_price_1mon_vec.clone();
let mut rx3_rt_price_1mon_vec = rx_rt_price_1mon_vec.clone();
let mut rx4_rt_price_1mon_vec = rx_rt_price_1mon_vec.clone();
// TEMA data
// let mut tema3_1m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // Vec<(symbol, Vec<(price, closetime)>)>
// let (tx_tema3_1m_data, mut rx_tema3_1m_data) = watch::channel(tema3_1m_data);
// let mut tema3_30m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema3_30m_data, mut rx_tema3_30m_data) = watch::channel(tema3_30m_data);
// let mut tema3_1d_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema3_1d_data, mut rx_tema3_1d_data) = watch::channel(tema3_1d_data);
// let mut tema3_1w_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema3_1w_data, mut rx_tema3_1w_data) = watch::channel(tema3_1w_data);
// let mut tema3_1mon_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema3_1mon_data, mut rx_tema3_1mon_data) = watch::channel(tema3_1mon_data);
// let mut tema10_1m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // Vec<(symbol, Vec<(price, closetime)>)>
// let (tx_tema10_1m_data, mut rx_tema10_1m_data) = watch::channel(tema10_1m_data);
// let mut tema10_30m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema10_30m_data, mut rx_tema10_30m_data) = watch::channel(tema10_30m_data);
// let mut tema10_1d_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema10_1d_data, mut rx_tema10_1d_data) = watch::channel(tema10_1d_data);
// let mut tema10_1w_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema10_1w_data, mut rx_tema10_1w_data) = watch::channel(tema10_1w_data);
// let mut tema10_1mon_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema10_1mon_data, mut rx_tema10_1mon_data) = watch::channel(tema10_1mon_data);
// let mut tema30_1m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new(); // Vec<(symbol, Vec<(price, closetime)>)>
// let (tx_tema30_1m_data, mut rx_tema30_1m_data) = watch::channel(tema30_1m_data);
// let mut tema30_30m_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema30_30m_data, mut rx_tema30_30m_data) = watch::channel(tema30_30m_data);
// let mut tema30_1d_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema30_1d_data, mut rx_tema30_1d_data) = watch::channel(tema30_1d_data);
// let mut tema30_1w_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema30_1w_data, mut rx_tema30_1w_data) = watch::channel(tema30_1w_data);
// let mut tema30_1mon_data: Vec<(String, Vec<(f64, i64)>)> = Vec::new();
// let (tx_tema30_1mon_data, mut rx_tema30_1mon_data) = watch::channel(tema30_1mon_data);
// Exchange Information data
let mut exchange_info_data: Vec<ExchangeInfo> = Vec::new();
let (tx_exchange_info_data, mut rx_exchange_info_data) = watch::channel(exchange_info_data);
let mut rx2_exchange_info_data = rx_exchange_info_data.clone();
let mut rx3_exchange_info_data = rx_exchange_info_data.clone();
let mut rx4_exchange_info_data = rx_exchange_info_data.clone();
let mut rx5_exchange_info_data = rx_exchange_info_data.clone();
let mut rx6_exchange_info_data = rx_exchange_info_data.clone();
{
if RUNNING_MODE == RunningMode::REAL {
println!("*** REAL MODE ***");
} else if RUNNING_MODE == RunningMode::TEST {
println!("*** TEST MODE ***");
} else if RUNNING_MODE == RunningMode::SIMUL {
println!("*** SIMULATION MODE ***");
}
// pre-fetching candle data: 30m, 1d, 1w and 1mon
print!("pre-fetching candle data: 30m, 1d, 1w and 1mon..");
std::io::stdout().flush();
let instant = Instant::now();
let interval_30m = String::from("30m");
let interval_1d = String::from("1d");
let interval_1w = String::from("1w");
let interval_1mon = String::from("1mon");
let mut candle_30m_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
let mut candle_1d_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
let mut candle_1w_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
let mut candle_1mon_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
request_candles::fetch_candle_parallel(&interval_30m, &mut candle_30m_vec_temp).await;
request_candles::fetch_candle_parallel(&interval_1d, &mut candle_1d_vec_temp).await;
request_candles::fetch_candle_parallel(&interval_1w, &mut candle_1w_vec_temp).await;
request_candles::fetch_candle_parallel(&interval_1mon, &mut candle_1mon_vec_temp).await;
tx_candle_30m_vec.send_modify(|vec| *vec = candle_30m_vec_temp);
tx_candle_1d_vec.send_modify(|vec| *vec = candle_1d_vec_temp);
tx_candle_1w_vec.send_modify(|vec| *vec = candle_1w_vec_temp);
tx_candle_1mon_vec.send_modify(|vec| *vec = candle_1mon_vec_temp);
// sleep as much as the loop recurs per 60 seconds, expecting child threads will have finished within 60 seconds.
println!("Ok..");
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
let mut elapsed_time = instant.elapsed().as_secs();
let mut remaining_time: u64 = 60 - elapsed_time;
while remaining_time > 0 && 60 > remaining_time {
print!("\rstart tradingbot in {} seconds", remaining_time);
io::stdout().flush();
elapsed_time = instant.elapsed().as_secs();
loop {
let temp_elapsed_time = instant.elapsed().as_secs();
let is_overflowed = remaining_time.overflowing_sub(1).1;
if is_overflowed == true {
break;
}
if elapsed_time < temp_elapsed_time {
remaining_time -= 1;
break;
}
}
}
println!("\nTradingbot is running");
}
}
// Task#0: monitoring tasks
tokio::task::spawn(async move {
loop {
let result = rx_task0.recv().await;
match result {
Some(1) => {
print!("\r1[■] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(2) => {
print!("\r1[ ] 2[■] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(3) => {
print!("\r1[ ] 2[ ] 3[■] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(4) => {
print!("\r1[ ] 2[ ] 3[ ] 4[■] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(5) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[■] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(6) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[■] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(7) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[■] 8[] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(8) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[■ 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(9) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[■] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(12) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[■] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(13) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[■] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(14) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[■] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(15) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[■] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(16) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[■] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(17) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[■] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(18) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[■] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(19) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[■] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(20) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[■] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(21) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[■] 22[ ] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(22) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[■] 23[ ] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(23) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[■] 24[ ] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(24) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[■] 25[ ] 26[ ]");
io::stdout().flush();
}
Some(25) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[■] 26[ ]");
io::stdout().flush();
}
Some(26) => {
print!("\r1[ ] 2[ ] 3[ ] 4[ ] 5[ ] 6[ ] 7[ ] 8[ ] 9[ ] 12[ ] 13[ ] 14[ ] 15[ ] 16[ ] 17[ ] 18[ ] 19[ ] 20[ ] 21[ ] 22[ ] 23[ ] 24[ ] 25[ ] 26[■]");
io::stdout().flush();
}
Some(_) => {}
None => {}
}
sleep(Duration::from_millis(50)).await;
}
});
// Task#1: check server time and health
tokio::task::spawn(async move {
let client = ClientBuilder::new()
.timeout(Duration::from_millis(1000))
.build()
.unwrap();
let mut usertime = UserTime::new();
let mut serverhealth = ServerHealth::new();
let mut prev_server_epoch: u128 = 0;
let mut epoch_difference: f64 = 0.0;
let mut epoch_difference_vec: Vec<f64> = Vec::new();
let mut epoch_mean: f64 = 0.0;
let mut elapsed_time: u128 = 0;
loop {
let instant = Instant::now();
// let mut usertime_mutex = usertime_arc.lock().await;
// server_health_check_team::execute_server_health_check(&mut usertime_mutex).await;
// time_checking_team::execute_time_check(&mut usertime_mutex).await;
let result = server_health_check_team::execute_server_health_check(
&mut usertime,
&mut serverhealth,
&client,
)
.await;
while let Err(E) = time_checking_team::execute_time_check(&mut usertime, &client).await
{
sleep(Duration::from_millis((100))).await;
while let Err(e) = server_health_check_team::execute_server_health_check(
&mut usertime,
&mut serverhealth,
&client,
)
.await
{}
}
match result {
Ok(T) => {
tx1.send(usertime.local_epoch)
.expect("tx1-rx1 channel has been closed.");
tx2.send(usertime.epoch_difference)
.expect("tx2-rx2 channel has been closed.");
tx_task1.send(1).expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
// considering RTT and to make the wait time 1 second, latency each second for every 30 seconds is averaged and subtracted with wait time.
elapsed_time = instant.elapsed().as_millis();
if 1_000 > elapsed_time {
if prev_server_epoch != 0 {
epoch_mean = 0.0;
epoch_difference =
(usertime.server_epoch as f64) - (prev_server_epoch as f64) - 1000.0; // milli second
if epoch_difference > 300.0 || epoch_difference < -300.0 {
epoch_difference_vec.clear();
epoch_difference = 0.0;
}
if epoch_difference_vec.len() == 10 {
epoch_difference_vec.rotate_left(1);
epoch_difference_vec.pop();
epoch_difference_vec.push(epoch_difference);
} else if epoch_difference_vec.len() < 10 {
epoch_difference_vec.push(epoch_difference);
}
for element in &epoch_difference_vec {
epoch_mean += *element;
}
epoch_mean = epoch_mean / (epoch_difference_vec.len() as f64);
}
if epoch_mean > 0.0 {
if elapsed_time > ((epoch_mean) as u128)
&& 1_000 > elapsed_time + ((epoch_mean) as u128)
{
sleep(Duration::from_millis(
(1_000 - elapsed_time - ((epoch_mean) as u128)) as u64,
))
.await;
} else {
sleep(Duration::from_millis((1_000 - elapsed_time) as u64)).await;
}
} else {
sleep(Duration::from_millis(
(1_000 - elapsed_time + ((epoch_mean.abs()) as u128)) as u64,
))
.await;
}
}
prev_server_epoch = usertime.server_epoch;
}
});
// Task#2: request trade fee
tokio::task::spawn(async move {
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let tx1_changed = rx1_1.changed().await;
let tx2_changed = rx2.changed().await;
let local_epoch = *rx1_1.borrow();
let difference_epoch = *rx2.borrow();
match tx1_changed {
Ok(T) => match tx2_changed {
Ok(T) => {
let mut tradefee_vec_temp: Vec<TradeFee> = Vec::new();
let mut result;
loop {
result = request_others::request_trade_fee(
API_KEY,
SECRET_KEY,
local_epoch,
difference_epoch,
&client,
&mut tradefee_vec_temp,
)
.await;
if tradefee_vec_temp.len() == 0 {
sleep(Duration::from_secs(3)).await;
} else {
break;
}
}
match result {
Ok(T) => {
tx_tradefee_vec.send_modify(|vec| *vec = tradefee_vec_temp);
tx_task2.send(2).expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
}
Err(E) => {
panic!("tx2-rx2 channel has been closed.")
}
},
Err(E) => {
panic!("tx1-rx1 channel has been closed.")
}
}
// sleep as much as the loop recurs per 60 seconds if all operation finished within 60 seconds.
elapsed_time = instant.elapsed().as_millis();
if 60_000 > elapsed_time {
sleep(Duration::from_millis((60_000 - elapsed_time) as u64)).await;
}
}
});
// Task#3: request lot stepsize and ticksize
tokio::task::spawn(async move {
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
loop {
let mut exchange_info_data_temp: Vec<ExchangeInfo> = Vec::new();
let mut result;
loop {
result = coin_health_check_team::request_others::request_exchange_infomation(
&client,
&mut exchange_info_data_temp,
)
.await;
// retry
if exchange_info_data_temp.len() == 0 {
sleep(Duration::from_secs(3)).await;
} else {
break;
}
}
match result {
Ok(T) => {
tx_exchange_info_data.send_modify(|vec| *vec = exchange_info_data_temp);
tx_task3.send(3).expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
sleep(Duration::from_secs(300)).await; // sleep for 5 mins
}
});
// Task#4: request 24h price changes,
// pick valid USDT Trades,
// filtering stop USDT Trades,
// monitor total_24h_change_profit_index,
// usdt_24h_change_profit_index,
// total_price_down_dist_index
tokio::task::spawn(async move {
sleep(Duration::from_secs(10)).await;
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let result = request_others::request_24hr_ticker_price_change_statistics(&client).await;
match result {
Ok(T) => {
let exchange_info_vec = rx5_exchange_info_data.borrow().clone();
let mut valid_usdt_trade_vec_temp: Vec<String> = Vec::new();
let result = monitors::collect_valid_usde_trade(
&mut valid_usdt_trade_vec_temp,
&exchange_info_vec,
)
.await;
match result {
Ok(T) => {
tx_valid_usdt_trade_vec
.send_modify(|vec| *vec = valid_usdt_trade_vec_temp);
tx_task4.send(4).expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
}
Err(E) => {
println!(">>> Failed to monitor usdt_24h_change_profit_index.");
}
}
// match result {
// Ok(T) => {
// let result = monitors::total_24h_change_profit_index().await;
// match result {
// Ok(T) => {},
// Err(E) => {
// println!(">>> Failed to monitor total_24h_change_profit_index.");
// }
// };
// let result = monitors::usdt_24h_change_profit_index().await;
// match result {
// Ok(T) => {},
// Err(E) => {
// println!(">>> Failed to monitor usdt_24h_change_profit_index.");
// }
// };
// let result = monitors::total_price_down_dist_index().await;
// match result {
// Ok(T) => {},
// Err(E) => {
// println!(">>> Failed to monitor total_price_down_dist_index.");
// }
// };
// },
// Err(E) => {
// println!(">>> Failed to fetch 24h price change data from endpoint or parse message.");
// }
// };
// sleep as much as the loop recurs per 30 seconds if all operation finished within 30 seconds.
elapsed_time = instant.elapsed().as_millis();
if 30_000 > elapsed_time {
sleep(Duration::from_millis((30_000 - elapsed_time) as u64)).await;
}
}
});
// Task#5: price per second
tokio::task::spawn(async move {
sleep(Duration::from_secs(20)).await;
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(1000))
.build()
.unwrap();
let mut server_epoch = 0;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let mut price_vec_temp: Vec<CoinPriceData> = Vec::new();
let result = request_others::request_all_coin_price(&client, &mut price_vec_temp).await;
let mut price_vec_temp_c: Vec<CoinPriceData> = Vec::new();
match result {
Ok(T) => {
price_vec_temp_c = price_vec_temp.clone();
tx_price_vec.send_modify(|vec| *vec = price_vec_temp);
tx_task5.send(5).expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// Task#10: make realtime price data of 1m, 30m, 1d, 1w and 1mon
let valid_usdt_trade_vec = rx_valid_usdt_trade_vec.borrow().clone();
// 1m
let interval = String::from("1m");
let candle_1m_vec = rx_candle_1m_vec.borrow().clone();
let rt_price_1m_vec_read_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
let mut rt_price_1m_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(
&interval,
&candle_1m_vec,
&rt_price_1m_vec_read_temp,
&mut rt_price_1m_vec_write_temp,
&price_vec_temp_c,
&valid_usdt_trade_vec,
)
.await;
match result {
Ok(T) => {
if tx_rt_price_1m_vec.is_closed() {
eprintln!("tx_rt_price_1m_vec has been closed!");
} else {
tx_rt_price_1m_vec.send_modify(|vec| *vec = rt_price_1m_vec_write_temp);
}
}
Err(E) => {}
}
// 30m
let interval = String::from("30m");
let candle_30m_vec = rx_candle_30m_vec.borrow().clone();
let rt_price_1m_vec = rx_rt_price_1m_vec.borrow().clone();
let mut rt_price_30m_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
if !rt_price_1m_vec.is_empty() {
let result =
value_estimation_team::datapoints::price_data::update_realtime_price_data(
&interval,
&candle_30m_vec,
&rt_price_1m_vec,
&mut rt_price_30m_vec_write_temp,
&price_vec_temp_c,
&valid_usdt_trade_vec,
)
.await;
match result {
Ok(T) => {
if tx_rt_price_30m_vec.is_closed() {
eprintln!("tx_rt_price_30m_vec has been closed!");
} else {
tx_rt_price_30m_vec
.send_modify(|vec| *vec = rt_price_30m_vec_write_temp);
}
}
Err(E) => {}
}
}
// 1d
let interval = String::from("1d");
let candle_1d_vec = rx_candle_1d_vec.borrow().clone();
let rt_price_30m_vec = rx_rt_price_30m_vec.borrow().clone();
let mut rt_price_1d_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
if !rt_price_30m_vec.is_empty() {
let result =
value_estimation_team::datapoints::price_data::update_realtime_price_data(
&interval,
&candle_1d_vec,
&rt_price_30m_vec,
&mut rt_price_1d_vec_write_temp,
&price_vec_temp_c,
&valid_usdt_trade_vec,
)
.await;
match result {
Ok(T) => {
if tx_rt_price_1d_vec.is_closed() {
eprintln!("tx_rt_price_1d_vec has been closed!");
} else {
tx_rt_price_1d_vec.send_modify(|vec| *vec = rt_price_1d_vec_write_temp);
}
}
Err(E) => {}
}
}
// { // 1w
// let interval = String::from("1w");
// let candle_1w_vec = rx_candle_1w_vec.borrow().clone();
// let rt_price_1d_vec = rx_rt_price_1d_vec.borrow().clone();
// let mut rt_price_1w_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
// let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1w_vec, &rt_price_1d_vec, &mut rt_price_1w_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await;
// match result {
// Ok(T) => {
// tx_rt_price_1w_vec.send_modify(|vec| *vec = rt_price_1w_vec_write_temp);
// tx_task10.send(10).expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// }
// { // 1mon
// let interval = String::from("1mon");
// let candle_1mon_vec = rx_candle_1mon_vec.borrow().clone();
// let rt_price_1w_vec = rx_rt_price_1w_vec.borrow().clone();
// let mut rt_price_1mon_vec_write_temp: Vec<(String, Vec<RealtimePriceData>)> = Vec::new();
// let result = value_estimation_team::datapoints::price_data::update_realtime_price_data(&interval, &candle_1mon_vec, &rt_price_1w_vec, &mut rt_price_1mon_vec_write_temp, &price_vec_temp_c, &valid_usdt_trade_vec).await;
// match result {
// Ok(T) => {
// tx_rt_price_1mon_vec.send_modify(|vec| *vec = rt_price_1mon_vec_write_temp);
// tx_task10.send(10).expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// }
// sleep as much as the loop recurs per 1 seconds if all operation finished within 1 seconds.
elapsed_time = instant.elapsed().as_millis();
if 500 > elapsed_time {
sleep(Duration::from_millis((500 - elapsed_time) as u64)).await;
}
}
});
// Task#6: fetch candle 1m
tokio::task::spawn(async move {
let mut elapsed_time = 0;
let interval = String::from("1m");
loop {
let instant = Instant::now();
let mut candle_1m_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
let result =
request_candles::fetch_candle_parallel(&interval, &mut candle_1m_vec_temp).await;
match result {
Ok(T) => {
tx_candle_1m_vec.send_modify(|vec| *vec = candle_1m_vec_temp);
tx_task6.send(6).expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 10 seconds, expecting child threads will have finished within 10 seconds.
elapsed_time = instant.elapsed().as_millis();
if 30_000 > elapsed_time {
// 10_000 for major trade
sleep(Duration::from_millis((30_000 - elapsed_time) as u64)).await;
}
}
});
// Task#7: fetch candle 30m
tokio::task::spawn(async move {
let mut elapsed_time = 0;
let interval = String::from("30m");
loop {
let instant = Instant::now();
let mut candle_30m_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
let result =
request_candles::fetch_candle_delay(&interval, &mut candle_30m_vec_temp).await;
// request_candles::fetch_candle_parallel(&interval, &mut candle_30m_vec_temp).await;
match result {
Ok(T) => {
tx_candle_30m_vec.send_modify(|vec| *vec = candle_30m_vec_temp);
tx_task7.send(7).expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 60 seconds, expecting child threads will have finished within 60 seconds.
elapsed_time = instant.elapsed().as_millis();
if 60_000 > elapsed_time {
//60_000
sleep(Duration::from_millis((60_000 - elapsed_time) as u64)).await;
}
}
});
// Task#8: fetch candle 1d
tokio::task::spawn(async move {
let mut elapsed_time = 0;
let interval = String::from("1d");
sleep(Duration::from_secs(600)).await;
loop {
let instant = Instant::now();
let mut candle_1d_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
let result =
request_candles::fetch_candle_delay(&interval, &mut candle_1d_vec_temp).await;
match result {
Ok(T) => {
tx_candle_1d_vec.send_modify(|vec| *vec = candle_1d_vec_temp);
tx_task8.send(8).expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1800 seconds, expecting child threads will have finished within 1800 seconds.
elapsed_time = instant.elapsed().as_secs();
if 1_800 > elapsed_time {
sleep(Duration::from_secs((1_800 - elapsed_time) as u64)).await;
}
}
});
// // Task#9: fetch candle 1w
// tokio::task::spawn(async move{
// let interval = String::from("1w");
// sleep(Duration::from_secs(600)).await;
// loop{
// let mut candle_1w_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
// let result = request_candles::fetch_candle_delay(&interval, &mut candle_1w_vec_temp).await;
// match result {
// Ok(T) => {
// tx_candle_1w_vec.send_modify(|vec| *vec = candle_1w_vec_temp);
// tx_task9.send(9).expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// }
// });
// // Task#10: fetch candle 1mon
// tokio::task::spawn(async move{
// let interval = String::from("1mon");
// sleep(Duration::from_secs(600)).await;
// loop{
// let mut candle_1mon_vec_temp: Vec<(String, Vec<CandleData>)> = Vec::new();
// let result = request_candles::fetch_candle_delay(&interval, &mut candle_1mon_vec_temp).await;
// match result {
// Ok(T) => {
// tx_candle_1mon_vec.send_modify(|vec| *vec = candle_1mon_vec_temp);
// tx_task10.send(10).expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// }
// });
// Task#11: monitoring total market cap
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST {
tokio::task::spawn(async move {
loop {
let result = signal_association::coinmarketcap::market_cap_index().await;
match result {
Ok(T) => {
tx_task11
.send(11)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
}
});
}
// Task#12: monitoring foreign exchange rate
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
tokio::task::spawn(async move {
loop {
let result = signal_association::exchange_rate::monitoring_fx_rate_index().await;
match result {
Ok(T) => {
tx_task12
.send(12)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
sleep(Duration::from_secs(2700)).await; // sleep for 45mins
}
});
}
// // // Task#13: monitoring dollar index
// // tokio::task::spawn(async move {
// // loop
// // {
// // let result = signal_association::dollar_index::monitoring_dollar_index().await;
// // match result {
// // Ok(T) => {
// // tx_task13.send(13).expect("The mpsc channel has been closed.");
// // }
// // Err(E) => {}
// // }
// // sleep(Duration::from_secs(1800)).await; // sleep for 30mins
// // }
// // });
// Task#14: monitoring signal decision
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST {
tokio::task::spawn(async move {
loop {
let result =
signal_association::signal_decision::monitoring_signal_decision().await;
match result {
Ok(T) => {
tx_task14
.send(14)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
sleep(Duration::from_millis(500)).await; // sleep for 0.5sec
}
});
}
// Task#15: monitoring future ratio
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST {
tokio::task::spawn(async move {
loop {
let result = signal_association::future_ratio::monitoring_future_ratio().await;
match result {
Ok(T) => {
tx_task15
.send(15)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
sleep(Duration::from_secs(60)).await; // sleep for 1min
}
});
}
// COEX part
// Task#16: execute strategis for buy
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
tokio::task::spawn(async move {
sleep(Duration::from_secs(40)).await;
let mut all_data = AllData::new();
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
all_data.valid_symbol_vec = rx3_valid_usdt_trade_vec.borrow().clone();
// realtime price data
all_data.rt_price_1m_vec = rx3_rt_price_1m_vec.borrow().clone();
all_data.rt_price_30m_vec = rx3_rt_price_30m_vec.borrow().clone();
all_data.rt_price_1d_vec = rx3_rt_price_1d_vec.borrow().clone();
all_data.rt_price_1w_vec = rx3_rt_price_1w_vec.borrow().clone();
all_data.rt_price_1mon_vec = rx3_rt_price_1mon_vec.borrow().clone();
let result =
strategy_team::strategy_manager::execute_list_up_for_buy(&all_data).await;
match result {
Ok(T) => {
tx_task16
.send(16)
.expect("The mpsc channel has been closed.");
}
Err(E) => {
// eprintln!("Couldn't execute strategists.");
}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 250 > elapsed_time {
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
}
}
});
} else {
tokio::task::spawn(async move {
sleep(Duration::from_secs(10)).await;
let mut elapsed_time = 0;
let mut all_data = AllData::new();
loop {
let instant = Instant::now();
all_data.valid_symbol_vec = rx3_valid_usdt_trade_vec.borrow().clone();
// realtime price data
all_data.rt_price_1m_vec = rx3_rt_price_1m_vec.borrow().clone();
all_data.rt_price_30m_vec = rx3_rt_price_30m_vec.borrow().clone();
all_data.rt_price_1d_vec = rx3_rt_price_1d_vec.borrow().clone();
all_data.rt_price_1w_vec = rx3_rt_price_1w_vec.borrow().clone();
all_data.rt_price_1mon_vec = rx3_rt_price_1mon_vec.borrow().clone();
// let result = coex::strategy_team::execute_strategist_for_test(&all_data).await;
// match result {
// Ok(T) => {
// tx_task16
// .send(16)
// .expect("The mpsc channel has been closed.");
// }
// Err(E) => {}
// }
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 250 > elapsed_time {
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
}
}
});
}
// Task#17: execute strategis for sell
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL {
tokio::task::spawn(async move {
sleep(Duration::from_secs(40)).await;
let mut all_data = AllData::new();
let mut exchange_info_vec: Vec<ExchangeInfo> = Vec::new();
let mut trade_fee_vec: Vec<TradeFee> = Vec::new();
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
all_data.valid_symbol_vec = rx4_valid_usdt_trade_vec.borrow().clone();
exchange_info_vec = rx6_exchange_info_data.borrow().clone();
trade_fee_vec = rx5_tradefee_vec.borrow().clone();
// realtime price data
all_data.rt_price_1m_vec = rx5_rt_price_1m_vec.borrow().clone();
all_data.rt_price_30m_vec = rx5_rt_price_30m_vec.borrow().clone();
all_data.rt_price_1d_vec = rx4_rt_price_1d_vec.borrow().clone();
all_data.rt_price_1w_vec = rx4_rt_price_1w_vec.borrow().clone();
all_data.rt_price_1mon_vec = rx4_rt_price_1mon_vec.borrow().clone();
let result = strategy_team::strategy_manager::execute_list_up_for_sell(
&all_data,
&exchange_info_vec,
&trade_fee_vec,
)
.await;
match result {
Ok(T) => {
tx_task17
.send(17)
.expect("The mpsc channel has been closed.");
}
Err(E) => {
// eprintln!("Couldn't execute strategists.");
}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 250 > elapsed_time {
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
}
}
});
}
// Task#18: monitoring pre-suggested coins
tokio::task::spawn(async move {
sleep(Duration::from_secs(30)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let coin_price_vec = rx3_price_vec.borrow().clone();
let result = coex::exchange_team::monitoring_pre_suggested_coins(&coin_price_vec).await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task18
.send(18)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 100 > elapsed_time {
sleep(Duration::from_millis((100 - elapsed_time) as u64)).await;
}
}
});
// Task#19: buy_coin
if RUNNING_MODE == REAL || RUNNING_MODE == SIMUL || RUNNING_MODE == TEST {
tokio::task::spawn(async move {
sleep(Duration::from_secs(30)).await;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let exchange_info_vec = rx_exchange_info_data.borrow().clone();
let trade_fee_vec = rx_tradefee_vec.borrow().clone();
let result =
coex::exchange_team::buy_coin(&exchange_info_vec, &trade_fee_vec).await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task19
.send(19)
.expect("The mpsc channel has been closed.");
}
Err(E) => {
eprint!("Error: {:?}", E);
}
}
// sleep as much as the loop recurs per 200ms second if all operation finished within 200ms
elapsed_time = instant.elapsed().as_millis();
if 200 > elapsed_time {
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
}
}
});
} else {
tokio::task::spawn(async move {
sleep(Duration::from_secs(15)).await;
let mut elapsed_time = 0;
let instant = Instant::now();
let exchange_info_vec = rx_exchange_info_data.borrow().clone();
let trade_fee_vec = rx_tradefee_vec.borrow().clone();
// let result = coex::exchange_team::buy_coin_for_test(&client, &coin_price_vec, &exchange_info_vec, &trade_fee_vec).await;
let result = coex::exchange_team::buy_coin(&exchange_info_vec, &trade_fee_vec).await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task19
.send(19)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 200ms second if all operation finished within 200ms
elapsed_time = instant.elapsed().as_millis();
if 200 > elapsed_time {
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
}
});
}
// Task#20: monitoring_open_buy_order
if RUNNING_MODE == REAL || RUNNING_MODE == TEST {
tokio::task::spawn(async move {
if RUNNING_MODE == REAL {
sleep(Duration::from_secs(30)).await;
} else if RUNNING_MODE == TEST {
sleep(Duration::from_secs(10)).await;
}
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let exchange_info_vec = rx3_exchange_info_data.borrow().clone();
let trade_fee_vec = rx2_tradefee_vec.borrow().clone();
let result = coex::order_team::monitoring_open_buy_order(
&client,
&exchange_info_vec,
&trade_fee_vec,
)
.await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task20
.send(20)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 200 > elapsed_time {
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
}
}
});
}
// Task#21: update_price_of_filled_buy_order
tokio::task::spawn(async move {
if RUNNING_MODE == TEST {
sleep(Duration::from_secs(10)).await;
} else {
sleep(Duration::from_secs(30)).await;
}
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let coin_price_vec = rx5_price_vec.borrow().clone();
let exchange_info_vec = rx2_exchange_info_data.borrow().clone();
let trade_fee_vec = rx3_tradefee_vec.borrow().clone();
let result = coex::order_team::update_price_of_filled_buy_order(
&coin_price_vec,
&exchange_info_vec,
&trade_fee_vec,
)
.await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task21
.send(21)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 250 > elapsed_time {
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
}
}
});
// Task#22: monitoring_open_sell_order
if RUNNING_MODE == REAL || RUNNING_MODE == TEST {
tokio::task::spawn(async move {
if RUNNING_MODE == REAL {
sleep(Duration::from_secs(30)).await;
} else if RUNNING_MODE == TEST {
sleep(Duration::from_secs(10)).await;
}
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let exchange_info_vec = rx4_exchange_info_data.borrow().clone();
let trade_fee_vec = rx4_tradefee_vec.borrow().clone();
let result = coex::order_team::monitoring_open_sell_order(
&client,
&exchange_info_vec,
&trade_fee_vec,
)
.await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task22
.send(22)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 250 > elapsed_time {
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
}
}
});
}
// Task#23: monitoring_filled_sell_order
tokio::task::spawn(async move {
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let result = coex::order_team::monitoring_filled_sell_order(&client).await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task23
.send(23)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 200 > elapsed_time {
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
}
}
});
// Task#24: monitoring_scoreboard
tokio::task::spawn(async move {
let mut elapsed_time = 0;
let mut all_data = AllData::new();
loop {
let instant = Instant::now();
// realtime price data
all_data.rt_price_1m_vec = rx4_rt_price_1m_vec.borrow().clone();
let result = coex::exchange_team::monitoring_scoreboard(&all_data).await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task24
.send(24)
.expect("The mpsc channel has been closed.");
}
Err(E) => {}
}
// sleep as much as the loop recurs per 1 second if all operation finished within 1 second.
elapsed_time = instant.elapsed().as_millis();
if 200 > elapsed_time {
sleep(Duration::from_millis((200 - elapsed_time) as u64)).await;
}
}
});
// Task#25: update current_total_usdt and available_usdt
tokio::task::spawn(async move {
let client = ClientBuilder::new()
.timeout(tokio::time::Duration::from_millis(3000))
.build()
.unwrap();
let mut previous_result = false;
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let result =
coex::assets_managing_team::monitoring_asset_usdt(&mut previous_result, &client)
.await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task25
.send(25)
.expect("The mpsc channel has been closed.");
}
Err(E) => {
println!("{}", E);
}
}
// sleep as much as the loop recurs per 300ms if all operation finished within 300ms.
elapsed_time = instant.elapsed().as_millis();
if 250 > elapsed_time {
sleep(Duration::from_millis((250 - elapsed_time) as u64)).await;
}
}
});
// Task#26: update kelly_criterion
tokio::task::spawn(async move {
let mut elapsed_time = 0;
loop {
let instant = Instant::now();
let result = coex::assets_managing_team::update_kelly_criterion().await;
// send Task#0 a message to notify running on
match result {
Ok(T) => {
tx_task26
.send(26)
.expect("The mpsc channel has been closed.");
}
Err(E) => {
println!("{}", E);
}
}
// sleep as much as the loop recurs per 1 minutes
elapsed_time = instant.elapsed().as_secs();
if 60 > elapsed_time {
sleep(Duration::from_secs((60 - elapsed_time) as u64)).await;
}
}
});
loop {
// println!("test limit order 실행");
// let client = ClientBuilder::new().timeout(Duration::from_millis(1000)).build().unwrap();
// limit_order_buy_test(&client).await;
sleep(Duration::from_secs(300000000000)).await;
}
Ok(())
}
// #![allow(unused)]
// #![allow(warnings)]
// mod assets_managing_team;
// mod database_control;
// mod value_estimation_team;
// mod plotting_team;
// mod request_test;
// use std::sync::{Arc};
// use reqwest::{Client, ClientBuilder};
// use tokio::{task::*, time::*, sync::watch, join, fs::*};
// use sqlx::{ mysql::*, Executor, Connection, FromRow, Row };
// pub const DB_URL: &str = "mysql://root:Durtkarovh23!@localhost:3306/tradingbot";
// #[tokio::main]
// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>>{
// let (tx1, mut rx1) = watch::channel(0);
// let mut rx2 = rx1.clone();
// let mut cnt = 0;
// tokio::task::spawn(async move{
// loop{
// println!("Task #1: {}", cnt);
// tx1.send(cnt).expect("tx1-rx1 channel has been closed.");
// cnt += 1;
// sleep(Duration::from_secs(1)).await;
// }
// });
// tokio::task::spawn(async move{
// loop{
// let tx1_changed = rx1.changed().await;
// match tx1_changed {
// Ok(T) => {
// println!(" Task #2: {:?}", rx1.borrow());
// }
// Err(E) => {
// println!(">>> Failed to fetch tradefee data from endpoint or parse message.");
// }
// }
// }
// });
// tokio::task::spawn(async move{
// loop{
// let tx1_changed = rx2.changed().await;
// match tx1_changed {
// Ok(T) => {
// println!(" Task #3: {:?}", rx2.borrow());
// }
// Err(E) => {
// println!(">>> Failed to fetch tradefee data from endpoint or parse message.");
// }
// }
// println!("what");
// }
// });
// // let client = ClientBuilder::new().connect_timeout(tokio::time::Duration::from_millis(1200)).build().unwrap();
// loop{
// // request_test::request_candlestick_data(&client).await;
// // plotting_team::plot_dp_mean_open_close_candle().await;
// // value_estimation_team::mean_price_for24h_candle_30m().await;
// // let job2 =
// // let job3 =
// // join!(job1);
// // let job4 = tokio::task::spawn(assets_managing_team::request_wallet_info(api_key, secret_key, time.return_from_local_epoch(), time.return_difference_epoch()));
// // tokio::join!(j1, j2, j3, j4, j5);
// println!("main ended!");
// sleep(Duration::from_secs(6)).await;
// }
// Ok(())
// }
// // use plotters::prelude::*;
// // fn main() -> Result<(), Box<dyn std::error::Error>> {
// // let root = BitMapBackend::new("plotters-doc-data/0.png", (640, 480)).into_drawing_area();
// // root.fill(&WHITE)?;
// // let mut chart = ChartBuilder::on(&root)
// // .caption("y=x^2", ("sans-serif", 50).into_font())
// // .margin(5)
// // .x_label_area_size(30)
// // .y_label_area_size(30)
// // .build_cartesian_2d(-1f32..1f32, -0.1f32..1f32)?;
// //
// // chart.configure_mesh().draw()?;
// //
// // chart
// // .draw_series(LineSeries::new(
// // (-50..=50).map(|x| x as f32 / 50.0).map(|x| (x, x * x)),
// // &RED,
// // ))?
// // .label("y = x^2")
// // .legend(|(x, y)| PathElement::new(vec![(x, y), (x + 20, y)], &RED));
// //
// // chart
// // .configure_series_labels()
// // .background_style(&WHITE.mix(0.8))
// // .border_style(&BLACK)
// // .draw()?;
// //
// // Ok(())
// // }