Застосування Обміну Повідомлень для Передавання Даних між Потоками
Одним із набираючих популярність підходів для забезпечення безпечної конкурентності є обмін повідомленнями, коли потоки або ж актори комунікують надсилаючи один одному повідомлення, що містять дані. Ось основна ідея, виражена в слогані з [документації мови програмування Go](https://go. dev/doc/effective_go#concurrency): "Не комунікуйте за допомогою спільної памʼяті; замість цього, діліться памʼяттю комунікуючи."
Для досягнення конкурентності за допомогою обміну повідомленнями, стандартна бібліотека Rust надає імплементацію каналів. Канал - це загальна концепція програмування, основна ідея якої полягає в тому, що дані надсилаються з одного потоку в інший.
Ви можете уявити канал в програмуванні схожим на напрямлений канал води, такий як струмок чи річка. Якщо ви помістите щось на кшталт гумової качечки в річку, вона попливе вниз за течією аж до кінця водного шляху.
Канал має дві половини: передавач (transmitter) і отримувач (receiver). Передавач - це місце, де ви пускаєте за течією гумових качечок, а отримувач - це місце куди гумова качечка потрапляє в кінці течії. Одна частина вашого коду викликає методи передавача з даними, які ви хочете відправити, а інша частина перевіряє отримувач на наявність отриманих повідомлень. Канал вважається закритим якщо передавач або ж отримувач були видалені.
Надалі, ми попрацюємо над програмою, яка має один потік для генерації значень і надсилання їх по каналу, а інший потік отримуватиме значення і виводитиме їх на екран. Ми відправлятимемо прості значення між потоками використовуючи канали для ілюстрації даного функціоналу. Як тільки ви познайомитесь з даним підходом, ви зможете використовувати канали для будь-яких потоків, яким потрібно комунікувати між собою, таким як чат-системи або ж системи, де багато потоків виконують частини обчислень і надсилають результати в один потік, котрий агрегує ці результати.
Спочатку, в Блоці коду 16-6, ми створимо канал, але не будемо нічого з ним робити. Зверніть увагу, що даний приклад поки що не скомпілюється, оскільки Rust не може визначити які типи значень ми хочемо надсилати через канал.
Файл: src/main.rs
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
Ми створюємо новий канал за допомогою функції mpsc::channel
, mpsc
означає multiple producer, single consumer (декілька виробників, один споживач). Словом, спосіб, в який стандартна бібліотека Rust імплементує канали означає, що канал може мати декілька відправляючих кінців, які створюють значення, але лише один споживаючий кінець, який споживає значення. Уявіть декілька струмків, що зливаються в одну велику річку: все що буде відправлено за течією будь-якого з струмків в кінці-кінців потрапить в річку. Наразі ми почнемо з одного виробника, але ми додамо ще декілька коли змусимо цей приклад працювати.
Функція mpsc::channel
повертає кортеж, першим елементом якого є відправляючий кінець - передавач, а другим елементом є отримуючий кінець - отримувач. Абревіатури tx
і rx
традиційно використовуються в багатьох сферах для позначення передавача (transmitter) та отримувача (receiver) відповідно, тому ми називаємо наші змінні таким чином, щоб позначити відповідні кінці каналу. Ми використовуємо інструкцію let
з шаблоном, що деструктуризує кортежі; ми обговоримо використання шаблонів в інструкціях let
та деструктуризацію в Розділі 18. Наразі ж знайте, що використання інструкції let
в такий спосіб є зручним підходом для витягування (extract) частин кортежу, який повертає після виконання mpsc::channel
.
Давайте перемістимо передавач в створений потік і попросимо його надіслати одну стрічку, щоб даний потік комунікував з основним потоком, як показано в Блоці коду 16-7. Це як помістити гумову качечку в річку вище за течією або ж надіслати чат-повідомлення з одного потоку в інший.
Файл: src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }
Знову ж таки, ми використовуємо thread::spawn
щоб створити новий потік і потім використовуємо move
щоб помістити tx
всередину замикання, адже таким чином потік володітиме tx
. Створений потік має володіти передавачем, щоб мати можливість надсилати повідомлення по каналу. Передавач має метод send
, котрий приймає значення, яке ми хочемо надіслати. Метод send
повертає Result<T, E>
, отже, якщо отримувач був видалений й немає куди надіслати значення, операція поверне помилку. В даному прикладі, ми викликаємо unwrap
, щоб наш код запанікував у випадку помилки. Однак в справжньому додатку ми б обробили помилки належним чином: поверніться до Розділу 9 щоб переглянути стратегії належної обробки помилок.
В Блоці коду 16-8 ми в основному потоці отримаємо/дістанемо значення з отримувача. Це як дістати гумову качечку з води в кінці річки або ж отримати повідомлення в чаті.
Файл: src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Got: {}", received); }
Отримувач має два корисні методі: recv
та try_recv
. Ми використовуємо recv
, скорочено від receive, який заблокує виконання основного потоку і чекатиме доки значення буде надіслане в канал. Як тільки значення буде надіслане, recv
поверне його, обернувши в Result<T, E>
. Коли передавач закриється, recv
поверне помилку, яка сигналізує про те, що значення більше не надходитимуть.
Метод try_recv
не блокує основний потік, а натомість одразу повертає Result<T, E>
: значення Ok
, котре містить повідомлення, якщо воно доступне, і Err
якщо цього разу немає жодних повідомлень. Використання try_recv
корисне якщо потік має виконувати іншу роботу, очікуючи на повідомлення: ми можемо написати цикл, котрий періодично викликає try_recv
, обробляє повідомлення, якщо воно доступне, а в іншому випадку деякий час виконує іншу роботу аж до наступної перевірки.
Ми використали recv
в цьому прикладі для простоти; ми не маємо жодної іншої роботи для основного потоку, окрім очікування повідомлень, тому блокування основного потоку є доречним/виправданим.
Коли ми запустимо код з Блоку коду 16-8, ми побачимо, що значення виводиться з основного потоку:
Got: hi
Ідеально!
Канали та Передача Володіння
Правила володіння відіграють важливу роль в обміні повідомленнями, оскільки вони допомагають вам писати безпечний конкурентний код. Запобігання помилкам в конкурентних програмах - це перевага, яку надає мислення в термінах володіння в ваших Rust програмах. Давайте проведемо експеримент для демонстрації того як канали та володіння працюють разом для запобігання проблемам: ми спробуємо використати значення val
в створеному потоці вже після того, як ми надіслали його далі по каналу. Спробуйте скомпілювати код з Блоку коду 16-9 щоб побачити чому він не пропускається компілятором:
Файл: src/main.rs
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {}", val);
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
Тут ми намагаємось вивести val
на екран вже після того як ми надіслали його по каналу за допомогою tx.send
. Дозволяти таке було б поганою ідеєю: як тільки значення буде надіслане в інший потік, такий потік може модифікувати або ж навіть видалити значення, перш ніж ми спробуємо використати його знову. Потенційно, зміни в іншому потоці можуть привести до помилок або ж неочікуваних результатів через суперечливі (inconsistent) або ж неіснуючі дані. Однак, Rust видасть помилку якщо ми спробуємо скомпілювати код з Блоку коду 16-9:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:31
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {}", val);
| ^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` due to previous error
Наша помилка в роботі з конкурентністю спричинила помилку компіляції. Функція send
бере володіння над своїм параметром, а коли значення переміщується (moved), отримувач бере над ним володіння. Це не дає нам випадково повторно використати значення після того як ми його надіслали; правила володіння перевіряють чи все гаразд.
Відправлення Декількох Значень та Спостереження за Очікуванням Отримувача
Код в Блоці коду 16-8 скомпілювався і виконався, але він не продемонстрував, що два окремі потоки спілкуються між собою через канал. В Блоці коду 16-10 ми зробили деякі зміни, які підтвердять, що код в Блоці коду 16-8 виконується конкурентно: створений потік тепер відсилатиме декілька повідомлень і робитиме секундну паузу між кожним повідомленням.
Файл: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
Цього разу створений потік має вектор стрічок, які ми хочемо надіслати в основний потік. Ми ітеруємось по ним, надсилаючи кожну стрічку окремо, і робимо паузу між кожною відправкою, викликаючи функцію thread::sleep
із значенням Duration
в 1 секунду.
В основному потоці, ми більше не викликаємо функцію recv
явно: замість цього ми розглядаємо rx
як ітератор. Отримуючи значення, ми виводимо його на екран. Якщо канал закриється, ітерування припиниться.
Під час виконання коду із Блоку коду 16-10, ви маєте побачити наступний вивід із 1-секундною паузою між кожним рядком:
Got: hi
Got: from
Got: the
Got: thread
Оскільки ми не маємо жодного коду, що призупиняє або ж відкладає виконання циклу for
в основному потоці, ми можемо сказати, що основний потік очікує отримання значень із створеного потоку.
Створення Декількох Виробників із Клонуванням Передавача
Раніше ми вже згадували, що mpsc
- це абревіатура для multiple producer, single consumer (кілька виробників, один споживач). Давайте використаємо mpsc
і розширимо код в Блоці коду 16-10 щоб створити кілька потоків, котрі надсилають дані одному й тому ж отримувачу. Ми можемо зробити це склонувавши передавач, як показано в Блоці коду 16-11:
Файл: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
// --snip--
}
Цього разу, перед тим як ми створимо перший потік, ми викликаємо clone
на передавачі. Це дасть нам новий передавач, який ми зможемо потім передати в створений потік. Ми передаємо оригінальний передавач в другий створений потік. Це дає нам два потоки, кожен з яких надсилає різні повідомленнями одному отримувачу.
Коли ви виконаєте код, ваш вивід має виглядати приблизно так:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
Ви можете бачити значення в іншому порядку, залежно від вашої системи. Саме це робить конкурентність цікавою, але й складною одночасно. Якщо ви поекспериментуєте з thread::sleep
, підставляючи різні значення в різні потоки, кожен запуск буде ще більш недетермінованим і щоразу створюватиме різний вивід.
Тепер, коли ми поглянули на те, як працюють канали, давайте розглянемо інший метод конкурентності.