Джейд Картер "Библиотеки Python Часть 2. Практическое применение"

От анализа больших данных и машинного обучения до автоматизации рутинных процессов и создания интерактивных визуализаций – эта часть станет вашим практическим путеводителем. Вы узнаете, как распределенно обрабатывать данные с помощью Dask и PySpark, строить динамические дашборды с Plotly и Dash, оптимизировать производительность моделей с Cython, и разрабатывать высоконагруженные приложения с использованием Asyncio и CUDA. Кроме того, особое внимание уделено автоматизации задач, включая парсинг данных, обработку документов и создание рабочих процессов с Airflow. Визуализация геоданных, работа с изображениями и звуком, а также современные подходы к тестированию и развертыванию приложений помогут вам интегрировать Python в самые разнообразные проекты. Эта часть предназначена для разработчиков, стремящихся расширить свои навыки и внедрять Python в практические сферы, требующие высокую производительность, автоматизацию и гибкость.

date_range Год издания :

foundation Издательство :Автор

person Автор :

workspaces ISBN :

child_care Возрастное ограничение : 12

update Дата обновления : 29.01.2025


from datetime import datetime, timedelta

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'user-actions-group',

'auto.offset.reset': 'earliest'

})

producer = Producer({'bootstrap.servers': broker})

consumer.subscribe(['user_actions'])

# Словарь для отслеживания пользователей

user_login_time = {}

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

action = json.loads(msg.value().decode('utf-8'))

user_id = action['user_id']

action_type = action['action']

timestamp = datetime.fromisoformat(action['timestamp'])

if action_type == 'login':

user_login_time[user_id] = timestamp

elif action_type == 'purchase' and user_id in user_login_time:

del user_login_time[user_id]

# Проверяем, прошло ли 10 минут

current_time = datetime.now()

for user, login_time in list(user_login_time.items()):

if current_time – login_time > timedelta(minutes=10):

notification = {'user_id': user, 'message': 'Сделайте покупку!'}

producer.produce('notifications', value=json.dumps(notification))

print(f"Уведомление отправлено для пользователя {user}")

del user_login_time[user]

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Время входа пользователей сохраняется в словаре.

– Если с момента входа прошло более 10 минут и покупка не совершена, генерируется уведомление.

Эти задачи показывают, как использовать Apache Kafka для решения реальных задач, таких как фильтрация событий, подсчет статистики, агрегация данных и сохранение обработанной информации. Эти примеры помогут вам освоить основные подходы к работе с потоками данных в реальном времени.

1.3 Работа с базами данных: SQLAlchemy и интеграция с Pandas

SQLAlchemy – это мощная библиотека для работы с базами данных в Python. Она предоставляет инструменты для удобного взаимодействия с реляционными базами данных через ORM (Object Relational Mapping) или с использованием чистого SQL.

Pandas же идеально подходит для анализа данных, но иногда данные, которые мы хотим обработать, хранятся в базах данных. Для этого SQLAlchemy и Pandas можно эффективно интегрировать, чтобы выгружать данные из базы, обрабатывать их в Pandas и сохранять обратно.

Установка и подключение

Для начала работы установите библиотеку SQLAlchemy:

```bash

pip install sqlalchemy

```

Если вы используете SQLite, дополнительных действий не требуется. Для других баз данных, таких как PostgreSQL или MySQL, также потребуется установить драйверы, например:

```bash

pip install psycopg2 # Для PostgreSQL

pip install pymysql # Для MySQL

```

Создайте подключение к базе данных с помощью SQLAlchemy. Например, для SQLite это будет выглядеть так:

```python

from sqlalchemy import create_engine

# Создаем подключение к базе данных SQLite

engine = create_engine('sqlite:///example.db', echo=True)

```

Здесь `echo=True` означает, что в консоль будут выводиться SQL-запросы, выполняемые через SQLAlchemy, что полезно для отладки.

Создание таблиц и работа с ORM

SQLAlchemy поддерживает два основных подхода: работа через ORM и использование SQL-запросов напрямую. Рассмотрим оба.

Создадим таблицу для хранения информации о пользователях:

```python

from sqlalchemy import Table, Column, Integer, String, MetaData

# Создаем метаданные

metadata = MetaData()

# Определяем таблицу

users = Table(

'users', metadata,

Column('id', Integer, primary_key=True),

Column('name', String),

Column('age', Integer),

Column('email', String)

)

# Создаем таблицу в базе данных

metadata.create_all(engine)

```

Теперь таблица `users` создана в базе данных.

Для добавления данных используем объект подключения:

```python

from sqlalchemy import insert

# Подключаемся к базе данных

conn = engine.connect()

# Добавляем данные

insert_query = insert(users).values([

{'name': 'Alice', 'age': 25, 'email': 'alice@example.com'},

{'name': 'Bob', 'age': 30, 'email': 'bob@example.com'},

{'name': 'Charlie', 'age': 35, 'email': 'charlie@example.com'}

])

Все книги на сайте предоставены для ознакомления и защищены авторским правом