Використання обміну повідомленнями для передачі данних між потоками

Одним із набираючих популярність підходів для забезпечення безпечної конкурентності є обмін повідомленнями, коли потоки або ж актори комунікують надсилаючи один одному повідомлення, що містять дані. Ось основна ідея, виражена в слогані з [документації мови програмування Go](https://go. dev/doc/effective_go#concurrency): "Не комунікуйте за допомогою спільної памʼяті; замість цього, діліться памʼяттю комунікуючи."

Для досягнення конкурентності за допомогою обміну повідомленнями, стандартна бібліотека Rust надає імплементацію каналів. Канал - це загальна концепція програмування, основна ідея якої полягає в тому, що дані надсилаються з одного потоку в інший.

Ви можете уявити канал в програмуванні схожим на напрямлений канал води, такий як струмок чи річка. Якщо ви помістите щось на кшталт гумової качечки в річку, вона попливе вниз за течією аж до кінця водного шляху.

Канал має дві половини: передавач (transmitter) і отримувач (receiver). Передавач - це місце, де ви пускаєте за течією гумових качечок, а отримувач - це місце куди гумова качечка потрапляє в кінці течії. Одна частина вашого коду викликає методи передавача з даними, які ви хочете відправити, а інша частина перевіряє отримувач на наявність отриманих повідомлень. Канал вважається закритим якщо передавач або ж отримувач були видалені.

Надалі, ми попрацюємо над програмою, яка має один потік для генерації значень і надсилання їх по каналу, а інший потік отримуватиме значення і виводитиме їх на екран. Ми відправлятимемо прості значення між потоками використовуючи канали для ілюстрації даного функціоналу. Як тільки ви познайомитесь з даним підходом, ви зможете використовувати канали для будь-яких потоків, яким потрібно комунікувати між собою, таким як чат-системи або ж системи, де багато потоків виконують частини обчислень і надсилають результати в один потік, котрий агрегує ці результати.

Спочатку, в Блоці коду 16-6, ми створимо канал, але не будемо нічого з ним робити. Зверніть увагу, що даний приклад поки що не скомпілюється, оскільки Rust не може визначити які типи значень ми хочемо надсилати через канал.

Файл: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Блок коду 16-6: Створення каналу і присвоєння двох його половин в tx і rx

Ми створюємо новий канал за допомогою функції mpsc::channel, mpsc означає multiple producer, single consumer (декілька виробників, один споживач). Словом, спосіб, в який стандартна бібліотека Rust імплементує канали означає, що канал може мати декілька відправляючих кінців, які створюють значення, але лише один споживаючий кінець, який споживає значення. Уявіть декілька струмків, що зливаються в одну велику річку: все що буде відправлено за течією будь-якого з струмків в кінці-кінців потрапить в річку. Наразі ми почнемо з одного виробника, але ми додамо ще декілька коли змусимо цей приклад працювати.

Функція mpsc::channel повертає кортеж, першим елементом якого є відправляючий кінець - передавач, а другим елементом є отримуючий кінець - отримувач. Абревіатури tx і rx традиційно використовуються в багатьох сферах для позначення передавача (transmitter) та отримувача (receiver) відповідно, тому ми називаємо наші змінні таким чином, щоб позначити відповідні кінці каналу. Ми використовуємо інструкцію let з шаблоном, що деструктуризує кортежі; ми обговоримо використання шаблонів в інструкціях let та деструктуризацію в Розділі 18. Наразі ж знайте, що використання інструкції let в такий спосіб є зручним підходом для витягування (extract) частин кортежу, який повертає після виконання mpsc::channel.

Давайте перемістимо передавач в створений потік і попросимо його надіслати одну стрічку, щоб даний потік комунікував з основним потоком, як показано в Блоці коду 16-7. Це як помістити гумову качечку в річку вище за течією або ж надіслати чат-повідомлення з одного потоку в інший.

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

Блок коду 16-7: Переміщення tx в створений потік і відправка "hi"

Знову ж таки, ми використовуємо thread::spawn щоб створити новий потік і потім використовуємо move щоб помістити tx всередину замикання, адже таким чином потік володітиме tx. Створений потік має володіти передавачем, щоб мати можливість надсилати повідомлення по каналу. Передавач має метод send, котрий приймає значення, яке ми хочемо надіслати. Метод send повертає Result<T, E>, отже, якщо отримувач був видалений й немає куди надіслати значення, операція поверне помилку. В даному прикладі, ми викликаємо unwrap, щоб наш код запанікував у випадку помилки. Однак в справжньому додатку ми б обробили помилки належним чином: поверніться до Розділу 9 щоб переглянути стратегії належної обробки помилок.

В Блоці коду 16-8 ми в основному потоці отримаємо/дістанемо значення з отримувача. Це як дістати гумову качечку з води в кінці річки або ж отримати повідомлення в чаті.

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

Блок коду 16-8: Отримання значення "hi" в основному потоці та вивід його на екран

Отримувач має два корисні методі: recv та try_recv. Ми використовуємо recv, скорочено від receive, який заблокує виконання основного потоку і чекатиме доки значення буде надіслане в канал. Як тільки значення буде надіслане, recv поверне його, обернувши в Result<T, E>. Коли передавач закриється, recv поверне помилку, яка сигналізує про те, що значення більше не надходитимуть.

Метод try_recv не блокує основний потік, а натомість одразу повертає Result<T, E>: значення Ok, котре містить повідомлення, якщо воно доступне, і Err якщо цього разу немає жодних повідомлень. Використання try_recv корисне якщо потік має виконувати іншу роботу, очікуючи на повідомлення: ми можемо написати цикл, котрий періодично викликає try_recv, обробляє повідомлення, якщо воно доступне, а в іншому випадку деякий час виконує іншу роботу аж до наступної перевірки.

Ми використали recv в цьому прикладі для простоти; ми не маємо жодної іншої роботи для основного потоку, окрім очікування повідомлень, тому блокування основного потоку є доречним/виправданим.

Коли ми запустимо код з Блоку коду 16-8, ми побачимо, що значення виводиться з основного потоку:

Got: hi

Ідеально!

Канали та передача володіння

Правила володіння відіграють важливу роль в обміні повідомленнями, оскільки вони допомагають вам писати безпечний конкурентний код. Запобігання помилкам в конкурентних програмах - це перевага, яку надає мислення в термінах володіння в ваших Rust програмах. Давайте проведемо експеримент для демонстрації того як канали та володіння працюють разом для запобігання проблемам: ми спробуємо використати значення val в створеному потоці вже після того, як ми надіслали його далі по каналу. Спробуйте скомпілювати код з Блоку коду 16-9 щоб побачити чому він не пропускається компілятором:

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

Блок коду 16-9: Спроба використати val після того, як воно було надіслане в канал

Тут ми намагаємось вивести val на екран вже після того як ми надіслали його по каналу за допомогою tx.send. Дозволяти таке було б поганою ідеєю: як тільки значення буде надіслане в інший потік, такий потік може модифікувати або ж навіть видалити значення, перш ніж ми спробуємо використати його знову. Потенційно, зміни в іншому потоці можуть привести до помилок або ж неочікуваних результатів через суперечливі (inconsistent) або ж неіснуючі дані. Однак, Rust видасть помилку якщо ми спробуємо скомпілювати код з Блоку коду 16-9:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` due to previous error

Наша помилка в роботі з конкурентністю спричинила помилку компіляції. Функція send бере володіння над своїм параметром, а коли значення переміщується (moved), отримувач бере над ним володіння. Це не дає нам випадково повторно використати значення після того як ми його надіслали; правила володіння перевіряють чи все гаразд.

Відправка декількох значень і спостерігання за очікуванням отримувача

Код в Блоці коду 16-8 скомпілювався і виконався, але він не продемонстрував, що два окремі потоки спілкуються між собою через канал. В Блоці коду 16-10 ми зробили деякі зміни, які підтвердять, що код в Блоці коду 16-8 виконується конкурентно: створений потік тепер відсилатиме декілька повідомлень і робитиме секундну паузу між кожним повідомленням.

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

Блок коду 16-10: Відправка декількох повідомлень із паузою між відправками

Цього разу створений потік має вектор стрічок, які ми хочемо надіслати в основний потік. Ми ітеруємось по ним, надсилаючи кожну стрічку окремо, і робимо паузу між кожною відправкою, викликаючи функцію thread::sleep із значенням Duration в 1 секунду.

В основному потоці, ми більше не викликаємо функцію recv явно: замість цього ми розглядаємо rx як ітератор. Отримуючи значення, ми виводимо його на екран. Якщо канал закриється, ітерування припиниться.

Під час виконання коду із Блоку коду 16-10, ви маєте побачити наступний вивід із 1-секундною паузою між кожним рядком:

Got: hi
Got: from
Got: the
Got: thread

Оскільки ми не маємо жодного коду, що призупиняє або ж відкладає виконання циклу for в основному потоці, ми можемо сказати, що основний потік очікує отримання значень із створеного потоку.

Створення декількох виробників шляхом клонування передавача

Раніше ми вже згадували, що mpsc - це абревіатура для multiple producer, single consumer (кілька виробників, один споживач). Давайте використаємо mpsc і розширимо код в Блоці коду 16-10 щоб створити кілька потоків, котрі надсилають дані одному й тому ж отримувачу. Ми можемо зробити це склонувавши передавач, як показано в Блоці коду 16-11:

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }

    // --snip--
}

Блок коду 16-11: Відправка декількох повідомлень з кількох виробників (producers)

Цього разу, перед тим як ми створимо перший потік, ми викликаємо clone на передавачі. Це дасть нам новий передавач, який ми зможемо потім передати в створений потік. Ми передаємо оригінальний передавач в другий створений потік. Це дає нам два потоки, кожен з яких надсилає різні повідомленнями одному отримувачу.

Коли ви виконаєте код, ваш вивід має виглядати приблизно так:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Ви можете бачити значення в іншому порядку, залежно від вашої системи. Саме це робить конкурентність цікавою, але й складною одночасно. Якщо ви поекспериментуєте з thread::sleep, підставляючи різні значення в різні потоки, кожен запуск буде ще більш недетермінованим і щоразу створюватиме різний вивід.

Тепер, коли ми поглянули на те, як працюють канали, давайте розглянемо інший метод конкурентності.