Код IT
← Каталог

Python — сравнение sequential, threads, asyncio, processes

Практикум «Последовательное и параллельное выполнение»: I/O и CPU, tqdm, ASCII-график, GIL.

python asyncconcurrencyencyclopedia3 embed URL статья в энциклопедии
Python main.py
"""
Сравнение sequential, threading, asyncio и multiprocessing на I/O и CPU задачах.
Практикум: https://spirzen.ru/encyclopedia/4-code-dev/4-05-asinhronnost/3
"""
import asyncio
import multiprocessing
import random
import sys
import threading
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

try:
    from tqdm import tqdm

    HAS_TQDM = True
except ImportError:
    HAS_TQDM = False
    print("Установите tqdm: pip install tqdm", file=sys.stderr)


# --- I/O-bound (симуляция сети) ---


def io_task(task_id, delay):
    time.sleep(delay)
    return f"IO-задача #{task_id} (задержка {delay:.2f}с)"


async def io_task_async(task_id, delay):
    await asyncio.sleep(delay)
    return f"IO-задача #{task_id} (задержка {delay:.2f}с)"


# --- CPU-bound (вычисления) ---


def cpu_task(task_id, size=5_000_000):
    result = 0
    for i in range(size):
        result += i * i
    return f"CPU-задача #{task_id} (вычислено {size} итераций)"


def run_sequential(tasks, task_type="io"):
    print(f"\n{'=' * 60}")
    print(f"ПОСЛЕДОВАТЕЛЬНЫЙ РЕЖИМ ({task_type.upper()})")
    print("=" * 60)

    start_time = time.time()
    results = []
    iterator = tqdm(tasks, desc="Выполнение", unit="задач") if HAS_TQDM else tasks

    for task_id, value in iterator:
        if task_type == "io":
            results.append(io_task(task_id, value))
        else:
            results.append(cpu_task(task_id, value))

    elapsed = time.time() - start_time
    print(f"\nПоследовательно: {elapsed:.2f} сек")
    return results, elapsed


def run_threads(tasks, max_workers=10):
    print(f"\n{'=' * 60}")
    print(f"ПОТОКИ — {len(tasks)} задач, {max_workers} воркеров")
    print("=" * 60)

    start_time = time.time()
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(io_task if isinstance(delay, float) else cpu_task, task_id, delay)
            for task_id, delay in tasks
        ]
        iterator = tqdm(futures, desc="Выполнение", unit="задач") if HAS_TQDM else futures
        for future in iterator:
            results.append(future.result())

    elapsed = time.time() - start_time
    print(f"\nПотоки: {elapsed:.2f} сек")
    return results, elapsed


def run_processes(tasks, max_workers=None):
    if max_workers is None:
        max_workers = multiprocessing.cpu_count()

    print(f"\n{'=' * 60}")
    print(f"МНОГОПРОЦЕССОРНОСТЬ — {len(tasks)} задач, {max_workers} ядер")
    print("=" * 60)

    start_time = time.time()
    results = []

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(cpu_task, task_id, size) for task_id, size in tasks]
        iterator = tqdm(futures, desc="Выполнение", unit="задач") if HAS_TQDM else futures
        for future in iterator:
            results.append(future.result())

    elapsed = time.time() - start_time
    print(f"\nПроцессы: {elapsed:.2f} сек")
    return results, elapsed


async def run_async_io(tasks):
    print(f"\n{'=' * 60}")
    print(f"ASYNCIO — {len(tasks)} задач")
    print("=" * 60)

    start_time = time.time()
    async_tasks = [asyncio.create_task(io_task_async(task_id, delay)) for task_id, delay in tasks]

    if HAS_TQDM:
        results = []
        for i, task in enumerate(async_tasks, 1):
            results.append(await task)
            print(
                f"\rПрогресс: [{'#' * i}{'.' * (len(tasks) - i)}] {i}/{len(tasks)}",
                end="",
            )
        print()
    else:
        results = await asyncio.gather(*async_tasks)

    elapsed = time.time() - start_time
    print(f"\nАсинхронно: {elapsed:.2f} сек")
    return results, elapsed


def visualize_comparison(results_dict):
    print("\n" + "=" * 60)
    print("ВИЗУАЛИЗАЦИЯ РЕЗУЛЬТАТОВ")
    print("=" * 60)

    max_time = max(results_dict.values())
    sorted_results = sorted(results_dict.items(), key=lambda x: x[1])

    print("\nСравнение времени выполнения:\n")
    for name, time_val in sorted_results:
        bar_length = int((time_val / max_time) * 50)
        bar = "█" * bar_length + "░" * (50 - bar_length)
        best_marker = " (лучший)" if time_val == sorted_results[0][1] else ""
        print(f"{name:20} {bar} {time_val:6.2f} сек{best_marker}")

    if "sequential (IO)" in results_dict or "sequential (CPU)" in results_dict:
        base_key = next(k for k in results_dict if k.startswith("sequential"))
        base_time = results_dict[base_key]
        print(f"\nУскорение относительно {base_key}:")
        for name, time_val in sorted_results:
            if name != base_key:
                print(f"  {name}: {base_time / time_val:.2f}x")


def main():
    print("СРАВНЕНИЕ ПОДХОДОВ К ПАРАЛЛЕЛИЗМУ")
    print("=" * 60)

    print("\nI/O-BOUND (симуляция сети)")
    print("-" * 40)

    num_io_tasks = 15
    io_tasks = [(i, random.uniform(0.5, 2.0)) for i in range(num_io_tasks)]

    _, seq_io_time = run_sequential(io_tasks, "io")
    _, threads_io_time = run_threads(io_tasks, max_workers=8)
    _, async_io_time = asyncio.run(run_async_io(io_tasks))

    print("\n\nCPU-BOUND (интенсивные вычисления)")
    print("-" * 40)

    num_cpu_tasks = multiprocessing.cpu_count() * 2
    cpu_tasks = [(i, random.randint(3_000_000, 7_000_000)) for i in range(num_cpu_tasks)]

    _, seq_cpu_time = run_sequential(cpu_tasks, "cpu")
    _, proc_cpu_time = run_processes(cpu_tasks)

    print("\nВНИМАНИЕ: потоки для CPU из-за GIL почти не ускоряют!")
    _, threads_cpu_time = run_threads(cpu_tasks, max_workers=multiprocessing.cpu_count())

    io_compare = {
        "sequential (IO)": seq_io_time,
        "threads (IO)": threads_io_time,
        "async (IO)": async_io_time,
    }
    cpu_compare = {
        "sequential (CPU)": seq_cpu_time,
        "processes (CPU)": proc_cpu_time,
        "threads (CPU)": threads_cpu_time,
    }

    print("\nI/O задачи:")
    visualize_comparison(io_compare)
    print("\nCPU задачи:")
    visualize_comparison(cpu_compare)

    print(f"\nЯдер CPU: {multiprocessing.cpu_count()}")
    print(f"Задач I/O: {num_io_tasks}, CPU: {num_cpu_tasks}")


if __name__ == "__main__":
    multiprocessing.freeze_support()
    main()
"""
Сравнение sequential, threading, asyncio и multiprocessing на I/O и CPU задачах.
Практикум: https://spirzen.ru/encyclopedia/4-code-dev/4-05-asinhronnost/3
"""
import asyncio
import multiprocessing
import random
import sys
import threading
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

try:
    from tqdm import tqdm

    HAS_TQDM = True
except ImportError:
    HAS_TQDM = False
    print("Установите tqdm: pip install tqdm", file=sys.stderr)


# --- I/O-bound (симуляция сети) ---


def io_task(task_id, delay):
    time.sleep(delay)
    return f"IO-задача #{task_id} (задержка {delay:.2f}с)"


async def io_task_async(task_id, delay):
    await asyncio.sleep(delay)
    return f"IO-задача #{task_id} (задержка {delay:.2f}с)"


# --- CPU-bound (вычисления) ---


def cpu_task(task_id, size=5_000_000):
    result = 0
    for i in range(size):
        result += i * i
    return f"CPU-задача #{task_id} (вычислено {size} итераций)"


def run_sequential(tasks, task_type="io"):
    print(f"\n{'=' * 60}")
    print(f"ПОСЛЕДОВАТЕЛЬНЫЙ РЕЖИМ ({task_type.upper()})")
    print("=" * 60)

    start_time = time.time()
    results = []
    iterator = tqdm(tasks, desc="Выполнение", unit="задач") if HAS_TQDM else tasks

    for task_id, value in iterator:
        if task_type == "io":
            results.append(io_task(task_id, value))
        else:
            results.append(cpu_task(task_id, value))

    elapsed = time.time() - start_time
    print(f"\nПоследовательно: {elapsed:.2f} сек")
    return results, elapsed


def run_threads(tasks, max_workers=10):
    print(f"\n{'=' * 60}")
    print(f"ПОТОКИ — {len(tasks)} задач, {max_workers} воркеров")
    print("=" * 60)

    start_time = time.time()
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(io_task if isinstance(delay, float) else cpu_task, task_id, delay)
            for task_id, delay in tasks
        ]
        iterator = tqdm(futures, desc="Выполнение", unit="задач") if HAS_TQDM else futures
        for future in iterator:
            results.append(future.result())

    elapsed = time.time() - start_time
    print(f"\nПотоки: {elapsed:.2f} сек")
    return results, elapsed


def run_processes(tasks, max_workers=None):
    if max_workers is None:
        max_workers = multiprocessing.cpu_count()

    print(f"\n{'=' * 60}")
    print(f"МНОГОПРОЦЕССОРНОСТЬ — {len(tasks)} задач, {max_workers} ядер")
    print("=" * 60)

    start_time = time.time()
    results = []

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(cpu_task, task_id, size) for task_id, size in tasks]
        iterator = tqdm(futures, desc="Выполнение", unit="задач") if HAS_TQDM else futures
        for future in iterator:
            results.append(future.result())

    elapsed = time.time() - start_time
    print(f"\nПроцессы: {elapsed:.2f} сек")
    return results, elapsed


async def run_async_io(tasks):
    print(f"\n{'=' * 60}")
    print(f"ASYNCIO — {len(tasks)} задач")
    print("=" * 60)

    start_time = time.time()
    async_tasks = [asyncio.create_task(io_task_async(task_id, delay)) for task_id, delay in tasks]

    if HAS_TQDM:
        results = []
        for i, task in enumerate(async_tasks, 1):
            results.append(await task)
            print(
                f"\rПрогресс: [{'#' * i}{'.' * (len(tasks) - i)}] {i}/{len(tasks)}",
                end="",
            )
        print()
    else:
        results = await asyncio.gather(*async_tasks)

    elapsed = time.time() - start_time
    print(f"\nАсинхронно: {elapsed:.2f} сек")
    return results, elapsed


def visualize_comparison(results_dict):
    print("\n" + "=" * 60)
    print("ВИЗУАЛИЗАЦИЯ РЕЗУЛЬТАТОВ")
    print("=" * 60)

    max_time = max(results_dict.values())
    sorted_results = sorted(results_dict.items(), key=lambda x: x[1])

    print("\nСравнение времени выполнения:\n")
    for name, time_val in sorted_results:
        bar_length = int((time_val / max_time) * 50)
        bar = "█" * bar_length + "░" * (50 - bar_length)
        best_marker = " (лучший)" if time_val == sorted_results[0][1] else ""
        print(f"{name:20} {bar} {time_val:6.2f} сек{best_marker}")

    if "sequential (IO)" in results_dict or "sequential (CPU)" in results_dict:
        base_key = next(k for k in results_dict if k.startswith("sequential"))
        base_time = results_dict[base_key]
        print(f"\nУскорение относительно {base_key}:")
        for name, time_val in sorted_results:
            if name != base_key:
                print(f"  {name}: {base_time / time_val:.2f}x")


def main():
    print("СРАВНЕНИЕ ПОДХОДОВ К ПАРАЛЛЕЛИЗМУ")
    print("=" * 60)

    print("\nI/O-BOUND (симуляция сети)")
    print("-" * 40)

    num_io_tasks = 15
    io_tasks = [(i, random.uniform(0.5, 2.0)) for i in range(num_io_tasks)]

    _, seq_io_time = run_sequential(io_tasks, "io")
    _, threads_io_time = run_threads(io_tasks, max_workers=8)
    _, async_io_time = asyncio.run(run_async_io(io_tasks))

    print("\n\nCPU-BOUND (интенсивные вычисления)")
    print("-" * 40)

    num_cpu_tasks = multiprocessing.cpu_count() * 2
    cpu_tasks = [(i, random.randint(3_000_000, 7_000_000)) for i in range(num_cpu_tasks)]

    _, seq_cpu_time = run_sequential(cpu_tasks, "cpu")
    _, proc_cpu_time = run_processes(cpu_tasks)

    print("\nВНИМАНИЕ: потоки для CPU из-за GIL почти не ускоряют!")
    _, threads_cpu_time = run_threads(cpu_tasks, max_workers=multiprocessing.cpu_count())

    io_compare = {
        "sequential (IO)": seq_io_time,
        "threads (IO)": threads_io_time,
        "async (IO)": async_io_time,
    }
    cpu_compare = {
        "sequential (CPU)": seq_cpu_time,
        "processes (CPU)": proc_cpu_time,
        "threads (CPU)": threads_cpu_time,
    }

    print("\nI/O задачи:")
    visualize_comparison(io_compare)
    print("\nCPU задачи:")
    visualize_comparison(cpu_compare)

    print(f"\nЯдер CPU: {multiprocessing.cpu_count()}")
    print(f"Задач I/O: {num_io_tasks}, CPU: {num_cpu_tasks}")


if __name__ == "__main__":
    multiprocessing.freeze_support()
    main()