Джейд Картер "Библиотеки Python Часть 2. Практическое применение"

От анализа больших данных и машинного обучения до автоматизации рутинных процессов и создания интерактивных визуализаций – эта часть станет вашим практическим путеводителем. Вы узнаете, как распределенно обрабатывать данные с помощью Dask и PySpark, строить динамические дашборды с Plotly и Dash, оптимизировать производительность моделей с Cython, и разрабатывать высоконагруженные приложения с использованием Asyncio и CUDA. Кроме того, особое внимание уделено автоматизации задач, включая парсинг данных, обработку документов и создание рабочих процессов с Airflow. Визуализация геоданных, работа с изображениями и звуком, а также современные подходы к тестированию и развертыванию приложений помогут вам интегрировать Python в самые разнообразные проекты. Эта часть предназначена для разработчиков, стремящихся расширить свои навыки и внедрять Python в практические сферы, требующие высокую производительность, автоматизацию и гибкость.

date_range Год издания :

foundation Издательство :Автор

person Автор :

workspaces ISBN :

child_care Возрастное ограничение : 12

update Дата обновления : 29.01.2025


'auto.offset.reset': 'earliest'

})

product_consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'product-group',

'auto.offset.reset': 'earliest'

})

order_consumer.subscribe(['orders'])

product_consumer.subscribe(['products'])

# Словарь для хранения данных о товарах

product_catalog = {}

try:

while True:

# Чтение данных из топика products

product_msg = product_consumer.poll(0.1)

if product_msg and not product_msg.error():

product = json.loads(product_msg.value().decode('utf-8'))

product_catalog[product['product_id']] = {

'name': product['product_name'],

'price': product['price']

}

# Чтение данных из топика orders

order_msg = order_consumer.poll(0.1)

if order_msg and not order_msg.error():

order = json.loads(order_msg.value().decode('utf-8'))

product_id = order['product_id']

# Объединение данных о заказе и товаре

if product_id in product_catalog:

product = product_catalog[product_id]

total_price = order['quantity'] * product['price']

print(f"Заказ {order['order_id']}: {product['name']} x {order['quantity']} = {total_price} $")

else:

print(f"Информация о товаре {product_id} отсутствует.")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

order_consumer.close()

product_consumer.close()

```

Объяснение:

– Данные из топика `products` кэшируются в словаре `product_catalog`.

– При чтении заказа из топика `orders` программа объединяет данные и вычисляет итоговую стоимость.

Задача 7: Потоковая обработка с вычислением скользящего среднего

Описание:

В топик `stock_prices` поступают данные о ценах акций:

– `symbol` – тикер акции.

– `price` – текущая цена.

– `timestamp` – время.

Ваша задача: вычислять скользящее среднее цены акции за последние 5 сообщений для каждого тикера.

Решение:

```python

from confluent_kafka import Consumer

import json

from collections import defaultdict, deque

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'stocks-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['stock_prices'])

# Дек для хранения последних цен по тикерам

price_window = defaultdict(lambda: deque(maxlen=5))

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

stock_data = json.loads(msg.value().decode('utf-8'))

# Добавляем цену в окно

symbol = stock_data['symbol']

price_window[symbol].append(stock_data['price'])

# Вычисляем скользящее среднее

moving_average = sum(price_window[symbol]) / len(price_window[symbol])

print(f"Скользящее среднее для {symbol}: {moving_average:.2f}")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Используется `deque` для хранения последних 5 цен.

– Скользящее среднее вычисляется как сумма значений, делённая на их количество.

Задача 8: Генерация уведомлений

Описание:

В топик `user_actions` поступают данные о действиях пользователей:

– `user_id` – идентификатор пользователя.

– `action` – выполненное действие (например, "login", "purchase").

Напишите программу, которая отслеживает пользователей, выполнивших вход (`login`), но не совершивших покупку (`purchase`) в течение 10 минут, и отправляет уведомление в топик `notifications`.

Решение:

```python

from confluent_kafka import Consumer, Producer

import json

Все книги на сайте предоставены для ознакомления и защищены авторским правом