ISBN :
Возрастное ограничение : 12
Дата обновления : 29.01.2025
from datetime import datetime, timedelta
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'user-actions-group',
'auto.offset.reset': 'earliest'
})
producer = Producer({'bootstrap.servers': broker})
consumer.subscribe(['user_actions'])
# Словарь для отслеживания пользователей
user_login_time = {}
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
action = json.loads(msg.value().decode('utf-8'))
user_id = action['user_id']
action_type = action['action']
timestamp = datetime.fromisoformat(action['timestamp'])
if action_type == 'login':
user_login_time[user_id] = timestamp
elif action_type == 'purchase' and user_id in user_login_time:
del user_login_time[user_id]
# Проверяем, прошло ли 10 минут
current_time = datetime.now()
for user, login_time in list(user_login_time.items()):
if current_time – login_time > timedelta(minutes=10):
notification = {'user_id': user, 'message': 'Сделайте покупку!'}
producer.produce('notifications', value=json.dumps(notification))
print(f"Уведомление отправлено для пользователя {user}")
del user_login_time[user]
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Время входа пользователей сохраняется в словаре.
– Если с момента входа прошло более 10 минут и покупка не совершена, генерируется уведомление.
Эти задачи показывают, как использовать Apache Kafka для решения реальных задач, таких как фильтрация событий, подсчет статистики, агрегация данных и сохранение обработанной информации. Эти примеры помогут вам освоить основные подходы к работе с потоками данных в реальном времени.
1.3 Работа с базами данных: SQLAlchemy и интеграция с Pandas
SQLAlchemy – это мощная библиотека для работы с базами данных в Python. Она предоставляет инструменты для удобного взаимодействия с реляционными базами данных через ORM (Object Relational Mapping) или с использованием чистого SQL.
Pandas же идеально подходит для анализа данных, но иногда данные, которые мы хотим обработать, хранятся в базах данных. Для этого SQLAlchemy и Pandas можно эффективно интегрировать, чтобы выгружать данные из базы, обрабатывать их в Pandas и сохранять обратно.
Установка и подключение
Для начала работы установите библиотеку SQLAlchemy:
```bash
pip install sqlalchemy
```
Если вы используете SQLite, дополнительных действий не требуется. Для других баз данных, таких как PostgreSQL или MySQL, также потребуется установить драйверы, например:
```bash
pip install psycopg2 # Для PostgreSQL
pip install pymysql # Для MySQL
```
Создайте подключение к базе данных с помощью SQLAlchemy. Например, для SQLite это будет выглядеть так:
```python
from sqlalchemy import create_engine
# Создаем подключение к базе данных SQLite
engine = create_engine('sqlite:///example.db', echo=True)
```
Здесь `echo=True` означает, что в консоль будут выводиться SQL-запросы, выполняемые через SQLAlchemy, что полезно для отладки.
Создание таблиц и работа с ORM
SQLAlchemy поддерживает два основных подхода: работа через ORM и использование SQL-запросов напрямую. Рассмотрим оба.
Создадим таблицу для хранения информации о пользователях:
```python
from sqlalchemy import Table, Column, Integer, String, MetaData
# Создаем метаданные
metadata = MetaData()
# Определяем таблицу
users = Table(
'users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('age', Integer),
Column('email', String)
)
# Создаем таблицу в базе данных
metadata.create_all(engine)
```
Теперь таблица `users` создана в базе данных.
Для добавления данных используем объект подключения:
```python
from sqlalchemy import insert
# Подключаемся к базе данных
conn = engine.connect()
# Добавляем данные
insert_query = insert(users).values([
{'name': 'Alice', 'age': 25, 'email': 'alice@example.com'},
{'name': 'Bob', 'age': 30, 'email': 'bob@example.com'},
{'name': 'Charlie', 'age': 35, 'email': 'charlie@example.com'}
])
Все книги на сайте предоставены для ознакомления и защищены авторским правом