Джейд Картер "Библиотеки Python Часть 2. Практическое применение"

От анализа больших данных и машинного обучения до автоматизации рутинных процессов и создания интерактивных визуализаций – эта часть станет вашим практическим путеводителем. Вы узнаете, как распределенно обрабатывать данные с помощью Dask и PySpark, строить динамические дашборды с Plotly и Dash, оптимизировать производительность моделей с Cython, и разрабатывать высоконагруженные приложения с использованием Asyncio и CUDA. Кроме того, особое внимание уделено автоматизации задач, включая парсинг данных, обработку документов и создание рабочих процессов с Airflow. Визуализация геоданных, работа с изображениями и звуком, а также современные подходы к тестированию и развертыванию приложений помогут вам интегрировать Python в самые разнообразные проекты. Эта часть предназначена для разработчиков, стремящихся расширить свои навыки и внедрять Python в практические сферы, требующие высокую производительность, автоматизацию и гибкость.

date_range Год издания :

foundation Издательство :Автор

person Автор :

workspaces ISBN :

child_care Возрастное ограничение : 12

update Дата обновления : 29.01.2025


# Сортировка данных по дате

sorted_df = df.orderBy('transaction_date')

# Сохранение отсортированных данных в новый файл

sorted_df.write.csv('sorted_transactions', header=True, mode='overwrite')

print("Данные отсортированы и сохранены.")

# Завершаем сессию Spark

spark.stop()

```

Объяснение:

– `orderBy` сортирует данные по указанному столбцу.

– `write.csv` сохраняет результат в новом файле.

Эти задачи демонстрируют, как использовать Dask и PySpark для работы с большими объемами данных.

– Dask подходит для локальных задач и интеграции с Python-библиотеками.

– PySpark эффективен для кластерной обработки данных и интеграции с экосистемой Hadoop.

Обе библиотеки упрощают решение задач, которые сложно выполнить традиционными методами из-за ограничений памяти или мощности процессора.

1.2 Потоковая обработка данных с Apache Kafka

Apache Kafka – это мощная платформа для обработки потоков данных в реальном времени. Она широко используется для обработки и анализа событий, поступающих из различных источников, таких как веб-серверы, базы данных, датчики IoT, системы мониторинга и многое другое. Kafka обеспечивает высокую производительность, надежность и масштабируемость, что делает её одним из лучших инструментов для потоковой обработки данных.

В основе Apache Kafka лежат несколько ключевых компонентов:

1. Брокеры – серверы, которые принимают, хранят и доставляют данные.

2. Топики – логические каналы, через которые данные передаются.

3. Продюсеры – приложения или устройства, которые отправляют данные в Kafka.

4. Консьюмеры – приложения, которые получают данные из Kafka.

Kafka организует поток данных в виде последовательностей сообщений. Сообщения записываются в топики и разделяются на партиции, что позволяет обрабатывать данные параллельно.

Пример потока данных

Представим, что у нас есть система интернет-магазина, где Kafka используется для обработки событий, таких как заказы, клики на странице, добавление товаров в корзину и платежи. Каждое из этих событий записывается в топик Kafka. Например, топик `orders` может содержать события, описывающие новые заказы.

Установка и настройка Apache Kafka

Перед началом работы убедитесь, что Kafka установлена. Для локальной работы используйте официальные сборки Kafka с сайта [Apache Kafka](https://kafka.apache.org/).

1. Установите Kafka и запустите ZooKeeper, необходимый для управления брокерами.

2. Запустите Kafka-брокер.

3. Создайте топик с помощью команды:

```bash

bin/kafka-topics.sh –create –topic orders –bootstrap-server localhost:9092 –partitions 3 –replication-factor 1

```

Отправка данных в Kafka

Теперь создадим простого продюсера на Python, который будет отправлять данные в топик `orders`. Для работы с Kafka на Python используется библиотека `confluent-kafka`. Установите её с помощью команды:

```bash

pip install confluent-kafka

```

Пример кода, который отправляет сообщения в топик:

```python

from confluent_kafka import Producer

import json

import time

# Настройки продюсера

producer_config = {

'bootstrap.servers': 'localhost:9092' # Адрес Kafka-брокера

}

# Создание продюсера

producer = Producer(producer_config)

# Функция для обратного вызова при успешной отправке сообщения

def delivery_report(err, msg):

if err is not None:

print(f'Ошибка доставки сообщения: {err}')

else:

print(f'Сообщение отправлено: {msg.topic()} [{msg.partition()}]')

# Отправка данных в Kafka

orders = [

{'order_id': 1, 'product': 'Laptop', 'price': 1000},

{'order_id': 2, 'product': 'Phone', 'price': 500},

{'order_id': 3, 'product': 'Headphones', 'price': 150}

]

for order in orders:

producer.produce(

'orders',

key=str(order['order_id']),

value=json.dumps(order),

callback=delivery_report

)

producer.flush() # Отправка сообщений в брокер

time.sleep(1)

```

В этом примере продюсер отправляет JSON-объекты в топик `orders`. Каждое сообщение содержит данные о заказе.

Чтение данных из Kafka

Теперь создадим консьюмера, который будет читать сообщения из топика `orders`.

```python

from confluent_kafka import Consumer, KafkaException

# Настройки консьюмера

consumer_config = {

'bootstrap.servers': 'localhost:9092',

'group.id': 'order-group', # Группа консьюмеров

'auto.offset.reset': 'earliest' # Начало чтения с первой записи

}

# Создание консьюмера

consumer = Consumer(consumer_config)

# Подписка на топик

consumer.subscribe(['orders'])

# Чтение сообщений из Kafka

try:

while True:

msg = consumer.poll(1.0) # Ожидание сообщения (1 секунда)

if msg is None:

continue

if msg.error():

if msg.error().code() == KafkaException._PARTITION_EOF:

# Конец партиции

Все книги на сайте предоставены для ознакомления и защищены авторским правом