Перетворюємо наш однопотоковий сервер на багатопотоковий

Зараз наш сервер обробляє кожен запит по черзі, тобто він не обробить друге з'єднання, поки не завершить обробку першого. Якщо сервер отримує більше і більше запитів, це послідовне виконання буде все менш і менш оптимальним. Якщо сервер отримує запит, що обробляється довгий час, наступні запити муситимуть чекати, доки довгий запит не буде завершено, навіть якщо нові запити можна обробити швидко. Нам потрібно буде виправити це, але спершу ми поглянемо на цю проблему в дії.

Симуляція повільного запиту в поточній реалізації сервера

Ми подивимося на те, як запит з повільною обробкою може вплинути на інші запити, зроблені до нашої поточної реалізації сервера. Блок коду 20-10 реалізує обробку запиту до /sleep з симуляцією повільної реакції, що заблокує сервер у режимі сну на 5 секунд до відповіді.

Файл: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&mut stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Блок коду 20-10: симуляція повільного запиту режимом сну на 5 секунд

Тепер ми перейшли з if на match, бо у нас є три випадки. Ми маємо явно зіставляти слайс з request_line із шаблоном зі стрічковими літералами; match не робить автоматичних посилань і розіменувань, як методи порівняння на рівність.

Перший рукав той же, що й у блоці if з Блоку коду 20-9. Другий рукав зіставляє запит зі /sleep. Коли цей запит отримано, сервер спатиме 5 секунд перед передачею успішної HTML сторінки. Третій рукав той же, що й у блоці else з Блоку коду 20-9.

Ви можете побачити, наскільки примітивними є наш сервер: справжні бібліотеки оброблять розпізнавання кількох запитів у набагато менш розлогий спосіб!

Запустімо сервер командою cargo run. Тоді відкрийте два вікна браузера: одне для http://127.0.0.1:7878/, а інше - для http://127.0.0.1:7878/sleep. Якщо ви введете URL / кілька разів, то, як і раніше, ви побачите, що відповідь надходить швидко. Але якщо ви введете /sleep, а потім завантажте /, ви побачите, що / чекає, доки sleep "проспить" 5 секунд до завантаження.

Існує безліч методів, якими ми могли б скористатися, щоб уникнути гальмування через повільний запит; те, що ми реалізуємо - це пул потоків.

Поліпшення пропускної здатності за допомогою пулу потоків

Пул потоків - це група породжених потоків, що чекають і готові до обробки завдання. Коли програма отримує нове завдання, то призначає один із потоків з пулу на це завдання, і цей потік обробляє завдання. Решта потоків у пулі доступні, щоб обробити будь-яке інше завдання, що надійде, поки перший потік зайнятий обробкою. Коли перший потік завершить обробку свого завдання, то повернеться до пулу незайнятих потоків, готовий обробляти нове завдання. Пул потоків дозволяє вам обробляти з'єднання конкурентно, збільшуючи пропускну здатність вашого сервера.

Ми обмежимо кількість потоків в пулі невеликим числом, щоб захистити нас від атак на відмову в обслуговуванні (Denial of Service, DoS); якби наша програма створювала по потоку на кожен вхідний запит, то хтось, створивши 10 мільйонів запитів до нашого сервера, може обвалити його, вичерпавши всі його ресурси та призвівши до повної зупинки обробки запитів.

Замість необмеженого породження потоків ми матимемо фіксовану кількість потоків, що чекатимуть у пулі. Вхідні запити надсилатимуться в пул для обробки. Пул підтримуватиме чергу вхідних запитів. Кожен з потоків у пулі братиме запит з цієї черги, оброблятиме його і запитуватиме наступний запит з черги. З таким дизайном ми можемо обробити до N запитів конкурентно, де N є кількістю потоків. Якщо кожен потік відповідатиме на довгий запит, наступні запити все ж накопичуватиметься в черзі, але ми збільшили кількість довгих запитів, які ми можемо обробити до досягнення цього моменту.

Ця техніка - лише один із багатьох способів покращити пропускну здатність вебсервера. Інші варіанти, які ви можете дослідити, включають модель fork/join, однопотокова модель асинхронного I/O та *багатопотокова модель асинхронного I/O *. Якщо ви зацікавилися цією темою, то можете прочитати більше про інші рішення і спробувати реалізувати їх; усі ці варіанти доступні низькорівневій мові на кшталт Rust.

Перед тим, як почати реалізовувати пул тредів, поговоримо про те, як має виглядати його використання. Коли ви намагаєтеся проєктувати код, написання спершу клієнтського інтерфейсу може допомогти керувати проєктуванням. Напишіть API коду, щоб він був структурованим відносно способу його виклику; тоді реалізуйте функціональність відповідно до цієї структури, а не спершу реалізуйте функціональність, а тоді проєктуйте публічний API.

Подібно до того, як ми використовували керовану тестами розробку у проєкті з Розділу 12, ми використаємо тут керовану компілятором розробку. Ми напишемо код, що викликає потрібні нам функції, а потім ми подивимося на помилки компілятора, щоб вирішити, що ми маємо далі змінити, щоб цей код працював. Але перед цим ми дослідимо техніку, яку ми не збираємося використовувати, як відправну точку.

Породження потоку для кожного запиту

Спершу дослідимо, як наш код міг би виглядати, якби створював новий потік для кожного з'єднання. Як зазначено раніше, ми не плануємо так робити через проблеми з потенційним породженням нескінченої кількості потоків, але це вихідна точка, щоб спершу отримати робочий багатопотоковий сервер. Тоді ми покращимо код, додавши пул потоків, і відмінності між двома рішеннями стануть очевиднішими. Блок коду 20-11 показує зміни, які треба внести, щоб main породжував новий потік для обробки кожного потоку у циклі for.

Файл: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Блок коду 20-11: породження нового потоку виконання для кожного вхідного потоку

Як ви дізналися у Розділі 16, thread::spawn створить новий потік, а потім запустить код у замиканні в цьому новому потоці. Якщо ви запустите цей код і завантажите в браузері /sleep, а тоді / у двох додаткових вкладках браузера, ви й справді побачите, що запит до / не мусить чекати, доки не завершиться /sleep. Однак, як ми згадували, це кінець-кінцем перенавантажить систему, бо нові потоки створюються без будь-яких обмежень.

Створення скінченної кількості потоків

Ми хочемо, щоб наш пул потоків працював у схожий, знайомий спосіб, щоб перехід з потоків до пулу потоків не вимагав значних змін у коді, що використовує наш API. Блок коду 20-12 показує гіпотетичний інтерфейс для структури ThreadPool, яку ми хочемо використати замість thread::spawn.

Файл: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Блок коду 20-12: наш ідеальний інтерфейс ThreadPool

Ми використовуємо ThreadPool::new для створення нового пулу потоків налаштовуваним числом потоків, у цьому випадку чотирма. Тоді, в циклі for циклу, pool.execute має інтерфейс, подібний до thread::spawn у тому, що він для кожного вхідного потоку приймає замикання, яке пул має виконати. Ми маємо реалізувати pool.execute так, щоб він приймав замикання і передавав його треду в пулі на виконання. Цей код ще не компілюється, але ми спробуємо це зробити, щоб компілятор міг підказати, як це виправити.

Збірка ThreadPool за допомогою керованої компілятором розробки

Внесіть зміни з Блоку коду 20-12 до src/main.rs, а потім використаймо помилки компілятора з cargo check для керування розробкою. Ось яку першу помилку ми отримуємо:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` due to previous error

Чудово! Ця помилка говорить, що нам потрібен тип чи модуль ThreadPool, тож ми його створимо. Наша реалізація ThreadPool буде незалежною від виду роботи, що її виконує наш вебсервер. Отже, переробимо крейт hello з двійкового крейта на бібліотеку, де міститиметься наша реалізація ThreadPool. Після перероблення на бібліотечний крейт ми також могли б використати окрему бібліотеку пулу потоків для будь-якої роботи, яку ми хочемо виконати за допомогою пулу потоків, а не лише для обслуговування вебзапитів.

Створіть src/lib.rs, що містить найпростіше визначення структури ThreadPool, яке ми можемо наразі мати:

Файл: src/lib.rs

pub struct ThreadPool;

Далі відредагуйте файл main.rs, щоб ввести ThreadPool до області видимості з бібліотечного крейта, додавши наступний код зверху src/main.rs:

Файл: src/lib.rs

use hello::ThreadPool;
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Цей код все ще не працює, але перевірмо його ще раз, щоб отримати наступну помилку, над якою нам потрібно буде працювати:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error

Ця помилка означає, що нам необхідно створити для ThreadPool асоційовану функцію з назвою new. Ми також знаємо, що new повинна мати один параметр, який може прийняти 4 як аргумент і має повернути екземпляр ThreadPool. Реалізуймо найпростішу функцію new, що матиме такі характеристики:

Файл: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

Ми обрали типом параметра size тип usize, бо ми знаємо що від'ємна кількість потоків не має сенсу. Ми також знаємо, що ми використаємо 4 як число елементів у колекції потоків, а це саме те, для чого призначений тип usize, як говорилося у підрозділі “Цілі типи” Розділу 3.

Ще раз перевіримо код:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |              ^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error

Тепер стається помилка, бо ми не маємо методу execute на ThreadPool. Згадайте з підрозділу "Створення скінченної кількості потоків" , що ми вирішили, що інтерфейс нашого пула тредів має бути схожим на thread::spawn. На додачу ми реалізуємо функцію execute, щоб приймала передане їй замикання і передавало її вільному потоку з пула на виконання.

Ми визначимо метод execute для ThreadPool так, щоб він приймав параметром замикання. Згадайте з підрозділу “Переміщення захоплених значень із замикання та трейти Fn Розділу 13, що ми можемо приймати замикання параметрами за допомогою трьох різних трейтів: Fn, FnMut і FnOnce. Ми маємо вирішити, який тип замикань використовується тут. Ми знаємо, що в результаті вийде щось схоже на реалізацію thread::spawn зі стандартної бібліотеки, тож можемо подивитися на обмеження на параметр з сигнатури thread::spawn. Документація показує нам таке:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Тип-параметр F - це те, що нас тут цікавить; тип-параметр T стосується значення, що повертається, і він нас не цікавить. Ми бачимо, що spawn використовує FnOnce як обмеження трейту для F. Ймовірно, це те саме, що нам треба, тому що ми зрештою передамо аргумент, який отримали у execute, до spawn. Ми можемо бути впевнені, що FnOnce - це трейт, який ми хочемо використовувати, оскільки потік для виконання запиту виконає замикання цього запиту тільки один раз, що відповідає Once у FnOnce.

Тип-параметр F також має трейтове обмеження Send і обмеження часу Існування 'static, що є корисним у нашій ситуації: нам потрібен Send, щоб передавати замикання від одного потоку до іншого, і 'static, бо ми не знаємо, скільки часу виконуватиметься потік. Створімо метод execute для ThreadPool, що прийматиме узагальнений параметр типу F із цими обмеженнями:

Файл: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Ми все ще використовуємо () після FnOnce, бо FnOnce представляє замикання, що не приймає параметрів і повертає одиничний тип (). Як і у визначеннях функцій, тип, що повертається, можна не вказувати у сигнатурі, але навіть якщо ми не маємо параметрів, то все одно потребуємо дужки.

Знову ж таки, це найпростіша реалізація методу execute: вона не робить нічого, але ми намагаємося лише змусити наш код компілюватися. Ще раз перевіримо:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s

Компілюється! Але зверніть увагу, що якщо ви спробуєте запустити cargo run і зробити запит у браузері, то побачите в браузері помилки, які ми вже бачили на початку розділу. Наша бібліотека ще не викликає замикання, передане до execute!

Примітка: ви могли чути, що про мови з жорсткими компіляторами, такими як Haskell and Rust, кажуть "якщо код компілюється, то він працює." Але це твердження не завжди правильне. Наш проєкт компілюється, але абсолютно нічого не робить! Якби ми збирали реальний, повний проєкт, це був би вдалий час почати написати юніт-тести, щоб перевірити, що код компілюється і має бажану поведінку.

Валідація числа потоків у new

Ми ще нічого не робимо з параметрами new та execute. Реалізуймо тіла цих функцій з бажаною для нас поведінкою. Для початку, подумаємо про new. Раніше ми вибрали беззнаковий тип для параметра size, бо пул з від'ємним числом потоків не має сенсу. Однак пул з нулем потоків також не має жодного сенсу, проте нуль є абсолютно валідним usize. Ми додамо код, щоб перевірити, чи size є більшим, ніж нуль, перш ніж повертати екземпляр ThreadPool і змусимо програму паніку якщо вона отримує нуль, використовуючи макрос assert!, як показано в Блоці коду 20-13.

Файл: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Блок коду 20-13: реалізація ThreadPool::new, що панікує, якщо size буде нулем

Ми також додали трохи документації до нашого ThreadPool документаційним коментарем. Зверніть увагу, що ми слідували за хорошими практиками документації, додавши розділ, який описує ситуації, в яких наша функція може панікувати, як обговорювалося в Розділі 14. Спробуйте запустити cargo doc --open і натисніть на структуру ThreadPool, щоб побачити як виглядає документація для new!

Замість додавати макрос assert!, як ми зробили тут, ми могли б змінити new на build і повертати Result, як ми робили з Config::build у проєкті I/O з Блоку коду 12-9. Але ми вирішили, що в цьому випадку створити пул потоків без жодного потоку має бути невиправною помилкою. Якщо ви почуваєтеся амбітним, спробуйте написати функцію, що зветься build, щоб порівняти з функцією new, з такою сигнатурою:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

Створення місця для зберігання потоків

Тепер, коли ми можемо переконатися, що маємо валідну кількість потоків для зберігання в пулі, ми можемо створити ці потоки і зберегти їх у структурі ThreadPool перед тим, як її повертати. Але як нам "зберегти" потік? Ще раз погляньмо на сигнатуру thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Функція spawn повертає JoinHandle<T>, де T - тип, що повертає замикання. Спробуймо також використати JoinHandle і побачимо, що вийде. У нашому випадку, замикання, які ми передаємо до пулу потоків, будуть обробляти з'єднання, нічого не повертаючи, так що T буде одинчним типом ().

Код у Блоці коду 20-14 скомпілюється, але ще не створює жодних потоків. Ми змінили визначення ThreadPool, додавши в нього вектор екземплярів thread::JoinHandle<()>, ініціалізували цей вектор об'ємом size, організували цикл for, який виконуватиме певний код для створення потоків, та повернули екземпляр ThreadPool, що містить їх.

Файл: src/lib.rs

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Блок коду 20-14: створення вектора, що містить потоки, у ThreadPool

Ми ввели до області видимості std::thread з бібліотечного крейта, бо ми використовуємо thread::JoinHandle як тип елементів у векторі у ThreadPool.

Коли отримано валідний розмір, наш ThreadPool створює новий вектор, що може містити size елементів. Функція with_capacity виконує те саме завдання, що й Vec::new, але з важливою відмінністю: вона наперед виділяє місце у векторі. Оскільки ми знаємо, що нам потрібно зберігати size елементів у векторі, цей розподіл наперед є дещо ефективнішим, ніж використання Vec::new, який змінює розмір при вставленні елементів.

Коли ви знову запустите cargo check, він має відпрацювати успішно.

Структура Worker, відповідальна за пересилання коду з ThreadPool до потоку

Ми залишили коментар у циклі for у Блоці коду 20-14 про створення потоків. Тут ми розберемо, як насправді створювати потоки. Стандартна бібліотека уможливлює створення потоків через thread::spawn, який очікує отримати якийсь код, який потік має запустити, щойно його було створено. Однак у нашому випадку ми хочемо створити потоки, що очікують на код, який ми надішлемо пізніше. Реалізація зі стандартної бібліотеки не надає жодного способу це зробити; ми маємо реалізувати його вручну.

Ми реалізуємо таку поведінку, впровадивши нову структуру даних між ThreadPool і потоками, що оброблятиме цю поведінку. Ми назвемо цю структуру даних Worker, що є звичним терміном у реалізації пула. Worker бере код, який потрібно запустити і запускає його в своєму потоці. Уявіть працівників кухні в ресторані: вони чекають, поки не прийде замовлення від клієнтів, і тоді вони відповідають за прийняття цих замовлень і їхнє виконання.

Замість зберігання вектора екземплярів JoinHandle<()> у пулі потоків, Ми зберігатимемо екземпляри структуриWorker. Кожен Worker зберігатиме один екземпляр JoinHandle<()>. Тоді ми реалізуємо метод для Worker, який прийматиме замикання з кодом для запуску і відправлятиме його в уже робочий потік на виконання. Також ми надамо кожному worker id, щоб ми могли розрізняти різних worker в пулі для журналювання або налагодження.

Ось новий процес, що відбудеться, коли ми створимо ThreadPool. Ми реалізуємо код, який відправляє замикання в потік після того, як в нас вже є Worker, налаштований таким чином:

  1. Визначимо структуру Worker, яка містить id і JoinHandle<()>.
  2. Змінимо ThreadPool, щоб містив вектор екземплярів Worker.
  3. Визначимо функцію Worker::new, що приймає номер id і повертає екземпляр Worker, що містить id та потік, породжений із порожнім замиканням.
  4. У ThreadPool::new ми використовуємо лічильник циклу for, щоб згенерувати id, створити нового Worker з цим id, і зберегти worker у векторі.

If you’re up for a challenge, try implementing these changes on your own before looking at the code in Listing 20-15.

Готові? Ось Блок коду 20-15 з одним із можливих способів зробити описані зміни.

Файл: src/lib.rs

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

Блок коду 20-15: зміни до ThreadPool, щоб містив екземпляри Worker замість безпосередньо потоків

Ми змінили назву поля у ThreadPool з threads на workers, бо воно тепер містить екземпляри Worker замість JoinHandle<()>. Ми використовуємо лічильник циклу for циклі як аргумент Worker::new, і зберігаємо кожен новий Worker у векторі під назвою workers.

Зовнішній код (скажімо, наш сервер з src/main.rs) не має знати деталей реалізації стосовно використання структури Worker у ThreadPool, тож ми робимо структуру Worker і її функцію new приватними. Функція Worker::new використовує id, що ми їй передаємо, і зберігає екземпляр JoinHandle<()>, створений породженням нового потоку за допомогою порожнього замикання.

Примітка: якщо операційна система не може створити потік через нестачу системних ресурсів, thread::spawn панікуватиме. Це призведе до паніки усього нашого сервера, навіть якщо створення деяких потоків і буде вдалим. Заради простоти ця поведінка прийнятна, але у виробничій реалізації пулу потоків ви, швидше за все, захочете скористатися std::thread::Builder і його методом spawn , що повертає натомість Result.

Цей код скомпілюється і зберігатиме кількість екземплярів Worker, яку ми передали як аргумент для ThreadPool::new. Але ми все ще не обробляємо замикання, які ми отримали у execute. Подивімося, як це зробити, далі.

Надсилання запитів до потоків через канали

Наступна проблема, якою ми займемося, полягає в тому, що замикання, передані thread::spawn, не роблять абсолютно нічого. Наразі ми отримуємо замикання, що хочемо виконати, у методі execute. Але ми маємо передати до thread::spawn якесь замикання, коли ми створюємо кожного Worker при створенні ThreadPool.

Ми хочемо, щоб щойно створені структури Worker отримували код з черги, що міститься в ThreadPool, і відправляли цей код у свій потік на виконання.

Канали, про які ми дізналися у Розділі 16 — простий спосіб спілкування між двома потоками — ідеально підходять для цього випадку. Ми скористаємося каналом як чергою завдань, і execute відправить завдання з ThreadPool до екземплярів Worker, які перешлють завдання до своїх потоків. Ось наш план:

  1. ThreadPool створить канал і утримуватиме відправника.
  2. Кожен Worker утримуватиме отримувача.
  3. Ми створимо нову структуру Job, що міститиме замикання, що їх ми хочемо відправити каналом.
  4. Метод execute відправить завдання, яке треба виконати, через відправника.
  5. У своєму потоці Worker буде в циклі запитувати свого отримувача і виконувати замикання з отриманих завдань.

Почнімо з створення каналу в ThreadPool::new та утримання відправника у екземплярі ThreadPool, як показано у Блоці коду 20-16. Структура Job наразі не містить нічого, але буде типом елементів, що їх ми відправляємо каналом.

Файл: src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

Блок коду 20-16: змінюємо ThreadPool, зберігаючи відправника каналу, що передає екземпляри Job

У ThreadPool::newми створюємо новий канал і пул тепер містить відправника. Це успішно компілюється.

Спробуймо передати отримувача каналу усім worker, коли пул потоків створює канал. Ми знаємо, що хотіли б використати приймач у потоці, породженому worker, тож ми посилатимемося на параметр receiver у замиканні. Код у Блоці коду 20-17 поки що не компілюється.

Файл: src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

Блок коду 20-17: передавання приймача до worker

Ми зробили деякі дрібні і очевидні зміни: ми передаємо приймач до Worker::new, а потім використовуємо його всередині замикання.

Коли ми спробуємо перевірити цей код, то отримаємо таку помилку:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` due to previous error

Код намагається передати receiver кільком екземплярам Worker. Так не виходить, бо, як ви пам'ятаєте з Розділу 16, реалізація каналу в Rust має багатьох виробників і одного споживача. Це означає, що ми не можемо просто клонувати споживацький вихід каналу, щоб виправити код. Також ми не хочемо надсилати повідомлення кілька разів декільком споживачам; ми хочемо єдиниц список повідомлень з декількома worker, таким чином, щоб кожне повідомлення було оброблене один раз.

На додачу, приймання завдання з черги в каналі включає зміну receiver, тож потокам потрібен безпечний спосіб спільно використовувати та змінювати receiver; інакше ми можемо отримати стан гонитви (як розповідалося в Розділі 16).

Згадайте потокобезпечні розумні вказівники, про які йшлося в Розділі 16: щоб розділити володіння між кількома потоками і дозволити потокам змінювати значення, нам треба було скористатися Arc<Mutex<T>>. Тип Arc дозволить кільком worker володіти приймачем, а Mutex гарантує, що лише один worker отримує завдання з приймача за раз. Блок коду 20-18 показує зміни, які ми маємо зробити.

Файл: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

Блок коду 20-18: спільне використання приймача worker за допомогою Arc і Mutex

У ThreadPool::new, ми розміщуємо приймач у Arc і Mutex. Для кожного нового worker ми клонуємо Arc, щоб збільшити лічильник посилань, щоб worker могли спільно володіти приймачем.

З цими змінами, код компілюється! Ми вже близько!

Реалізація методу execute

Нарешті реалізуймо метод execute для ThreadPool. Також ми змінимо Job зі структури на псевдонім типу для трейтового об'єкта, який містить тип замикання, яку приймає execute. Як уже говорилося в підрозділі “Створення типів-синонімів за допомогою псевдонімів типів” Розділу 19, псевдоніми типів дозволяють нам скорочувати довгі типи для простоти використання. Подивіться на Блок коду 20-19.

Файл: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

Блок коду 20-19: створення псевдоніма типу Job для Box, що містить кожне замикання, і відправлення завдання каналом

Після створення нового екземпляра Job за допомогою замикання, що ми отримали в execute, ми підправляємо це завдання через вхід каналу. Ми викликаємо unwrap для send на випадок, якщо відправлення буде невдалим. Це може статися якщо, наприклад, ми зупинимо всі потоки, тобто вихід каналу припинить отримувати нові повідомлення. На цей час ми не можемо зупинити наші потоки: вони продовжують виконуватись, доки пул існує. Причина, чому ми використовуємо unwrap, полягає в тому, що ми знаємо, що невдача тут неможлива, але компілятор цього не знає.

Та ми ще не зовсім закінчили! У worker наше замикання, що передається до thread::spawn, лише посилається на вихід каналу. Натомість нам треба, щоб замикання у вічному циклі отримувало з вихідного кінця каналу завдання і після отримання виконувало його. Зробімо зміни, показані в Блоці коду 20-20, у Worker::new.

Файл: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

Блок коду 20-20: отримання і виконання завдань у потоці worker

Тут ми спершу викликаємо lock для receiver, щоб отримати м'ютекс, а потім викликаємо unwrap для паніки, якщо сталася якась помилка. Здійснення блокування може призвести до невдачі, якщо м'ютекс знаходиться у стані poisoned, що може статися, якщо якийсь інший потік запанікував, поки утримував блокування, а не відпустив його. У цій ситуації виклик unwrap для паніки є правильною дією. Можете за бажання змінити unwrap на expect зі змістовним для вас повідомленням про помилку.

Якщо ми отримали блокування м'ютекса, то викличемо recv, щоб отримати Job з каналу. Останній unwrap також покриває всі помилки, що могли виникнути якщо потік, що утримує відправника, завершився, так само як метод send повертає Err, якщо отримувач завершився.

Виклик recv блокує, тож якщо завдань немає, поточний потік чекатиме, доки не з'явиться доступне завдання. Mutex<T> гарантує, що лише один потік worker за раз намагатиметься отримати завдання.

Наш пул потоків нарешті у робочому стані! Виконайте cargo run і зробіть кілька запитів:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
 --> src/lib.rs:7:5
  |
7 |     workers: Vec<Worker>,
  |     ^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: field is never read: `id`
  --> src/lib.rs:48:5
   |
48 |     id: usize,
   |     ^^^^^^^^^

warning: field is never read: `thread`
  --> src/lib.rs:49:5
   |
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

warning: `hello` (lib) generated 3 warnings
    Finished dev [unoptimized + debuginfo] target(s) in 1.40s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

Успіх! Тепер у нас є пул потоків, який виконує з'єднання асинхронно. Також ніколи не буде створено більше ніж чотири потоки, тому наша система не перенавантажиться, якщо сервер отримає забагато запитів. Якщо ми робимо запит до /sleep, сервер буде мати можливість обслуговувати інші запити, бо їх виконувати буде інший потік.

Примітка: якщо ви відкриєте /sleep в декількох вікнах браузера одночасно, вони можуть вантажитися по одному з 5-секундним інтервалом. Деякі веббраузери виконують кілька екземплярів одного запиту послідовно для потреб кешування. Це обмеження не викликане нашим вебсервером.

Після вивчення циклу while let у Розділі 18, ви можете поцікавитися, чому ми не написали код потоку worker, як показано в Блоці коду 20-21.

Файл: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Блок коду 20-21: альтернативна реалізація Worker::new за допомогою while let

Цей код компілюється і запускається, але не дає бажаного багатопотокового результату: повільний запит усе ще змушує інші потоки чекати на обробку. Причина дещо тонка: структура Mutex не має публічного методу unlock, тому що володіння блокуванням базується на часі існування MutexGuard<T> у LockResult<MutexGuard<T>>, повернутим методом lock. Під час компіляції borrow checker може гарантувати правило, що ресурс, захищений Mutex, не буде доступним, якщо ми не маємо блокування. Однак ця реалізація також призведе до того, що блокування буде утримуватися довше, ніж потрібно, якщо ми не пам'ятаємо про час існування MutexGuard<T>.

Код у Блоці коду 20-20, що робить let job = receiver.lock().unwrap().recv().unwrap();, працює, бо в let будь-які тимчасові значення, використані у правій стороні знаку рівності, негайно очищуються, коли завершується інструкція let. Проте, while letif let та match) не очищують тимчасові значення до кінця відповідного блоку. У Блоці коду 20-21 блокування утримується на час виклику job(), тобто інші worker не можуть отримувати завдання. ch19-04-advanced-types.html#creating-type-synonyms-with-type-aliases ch13-01-closures.html#moving-captured-values-out-of-the-closure-and-the-fn-traits