reduce likelihood of downloaded data counter overflowing
This commit is contained in:
parent
ebcd23a56e
commit
83cf933755
|
|
@ -1,4 +1,4 @@
|
||||||
use std::{io, sync::{Arc, atomic::{AtomicU32, Ordering, AtomicBool, AtomicUsize}}, time::Duration};
|
use std::{io, sync::{Arc, atomic::{AtomicU32, Ordering, AtomicBool, AtomicUsize, AtomicU64}}, time::Duration};
|
||||||
use futures_util::{StreamExt, future::{self, join_all}};
|
use futures_util::{StreamExt, future::{self, join_all}};
|
||||||
use tui::{backend::CrosstermBackend, Terminal};
|
use tui::{backend::CrosstermBackend, Terminal};
|
||||||
use tokio::{sync::{mpsc, Mutex}, task::JoinHandle};
|
use tokio::{sync::{mpsc, Mutex}, task::JoinHandle};
|
||||||
|
|
@ -19,7 +19,7 @@ pub struct State {
|
||||||
num_tasks: Arc<AtomicUsize>,
|
num_tasks: Arc<AtomicUsize>,
|
||||||
num_connections_open: Arc<AtomicU32>,
|
num_connections_open: Arc<AtomicU32>,
|
||||||
num_tasks_errored: Arc<AtomicU32>,
|
num_tasks_errored: Arc<AtomicU32>,
|
||||||
num_bytes_downloaded: Arc<AtomicU32>,
|
num_kibibytes_downloaded: Arc<AtomicU64>,
|
||||||
shutting_down: Arc<AtomicBool>,
|
shutting_down: Arc<AtomicBool>,
|
||||||
urls: Vec<String>,
|
urls: Vec<String>,
|
||||||
log_send: mpsc::Sender<String>,
|
log_send: mpsc::Sender<String>,
|
||||||
|
|
@ -52,7 +52,7 @@ async fn async_main() {
|
||||||
num_tasks: Default::default(),
|
num_tasks: Default::default(),
|
||||||
num_connections_open: Default::default(),
|
num_connections_open: Default::default(),
|
||||||
num_tasks_errored: Default::default(),
|
num_tasks_errored: Default::default(),
|
||||||
num_bytes_downloaded: Default::default(),
|
num_kibibytes_downloaded: Default::default(),
|
||||||
shutting_down: Default::default(),
|
shutting_down: Default::default(),
|
||||||
urls,
|
urls,
|
||||||
log_send
|
log_send
|
||||||
|
|
@ -175,7 +175,7 @@ async fn download_task_inner(state: &State, load_id: usize, url: String) -> anyh
|
||||||
while load_id < state.num_requests.load(Ordering::Relaxed) && !state.shutting_down.load(Ordering::Relaxed) {
|
while load_id < state.num_requests.load(Ordering::Relaxed) && !state.shutting_down.load(Ordering::Relaxed) {
|
||||||
match stream.next().await {
|
match stream.next().await {
|
||||||
Some(Ok(bytes)) => {
|
Some(Ok(bytes)) => {
|
||||||
state.num_bytes_downloaded.fetch_add(bytes.len().try_into()?, Ordering::Relaxed);
|
state.num_kibibytes_downloaded.fetch_add((bytes.len()/1024).try_into()?, Ordering::Relaxed);
|
||||||
},
|
},
|
||||||
Some(Err(e)) => {
|
Some(Err(e)) => {
|
||||||
log_send.send(format!("Task id {} errored while reading from {}: {}", load_id, &url, e)).await?;
|
log_send.send(format!("Task id {} errored while reading from {}: {}", load_id, &url, e)).await?;
|
||||||
|
|
|
||||||
|
|
@ -118,7 +118,7 @@ fn ui<B: Backend>(f: &mut Frame<B>, ui_state: &UiState) {
|
||||||
ui_state.state.num_tasks.load(Ordering::Relaxed),
|
ui_state.state.num_tasks.load(Ordering::Relaxed),
|
||||||
ui_state.state.num_connections_open.load(Ordering::Relaxed),
|
ui_state.state.num_connections_open.load(Ordering::Relaxed),
|
||||||
ui_state.state.num_tasks_errored.load(Ordering::Relaxed),
|
ui_state.state.num_tasks_errored.load(Ordering::Relaxed),
|
||||||
ByteSize::b(ui_state.state.num_bytes_downloaded.load(Ordering::Relaxed).into()),
|
ByteSize::kib(ui_state.state.num_kibibytes_downloaded.load(Ordering::Relaxed).into()),
|
||||||
)));
|
)));
|
||||||
f.render_widget(Paragraph::new(text), layout[0]);
|
f.render_widget(Paragraph::new(text), layout[0]);
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue