Initial clean history
CI/CD / Test (push) Failing after 4m32s
CI/CD / Deploy (push) Has been skipped

This commit is contained in:
User
2026-05-11 16:46:25 +03:00
commit bf89e671d8
33 changed files with 1333 additions and 0 deletions
View File
+29
View File
@@ -0,0 +1,29 @@
import schedule
import threading
import time
from loguru import logger
def schedule_daily_backup(exporter):
"""Запускает ежесуточное резервное копирование в отдельном потоке"""
def job():
try:
exporter.backup()
logger.info("Daily backup executed successfully")
except Exception as e:
logger.error(f"Backup failed: {e}")
schedule.every().day.at("03:00").do(job)
def run_scheduler():
while True:
try:
schedule.run_pending()
except Exception as e:
logger.error(f"Scheduler error: {e}")
time.sleep(30) # Уменьшил интервал с 60 до 30 секунд для более точного срабатывания
thread = threading.Thread(target=run_scheduler, daemon=True)
thread.start()
logger.info("Backup scheduler started")
+17
View File
@@ -0,0 +1,17 @@
from loguru import logger
import sys
from config import settings
logger.remove()
logger.add(
sys.stdout,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan> - {message}",
level=settings.LOG_LEVEL
)
logger.add(
"logs/bot.log",
rotation="1 day",
retention="30 days",
format="{time} | {level} | {message}",
level="DEBUG"
)
+71
View File
@@ -0,0 +1,71 @@
from loguru import logger
from typing import Dict, List
from datetime import datetime, timedelta
class MiddlewareChain:
"""Простая цепочка мидлваров (логирование, антиспам, аналитика)"""
def __init__(self):
self.middlewares: List[callable] = []
def add(self, middleware: callable):
self.middlewares.append(middleware)
def process(self, user_id: int, text: str) -> bool:
for mw in self.middlewares:
if not mw(user_id, text):
return False
return True
# ==================== Антиспам-фильтр ====================
class SpamFilter:
"""Простой антиспам-фильтр: блокирует пользователя, если он отправляет
более `max_messages` сообщений за `window_seconds` секунд."""
def __init__(self, max_messages: int = 5, window_seconds: int = 60):
self.max_messages = max_messages
self.window_seconds = window_seconds
# user_id -> [timestamp1, timestamp2, ...]
self._messages: Dict[int, List[float]] = {}
def is_spam(self, user_id: int) -> bool:
now = datetime.now().timestamp()
window_start = now - self.window_seconds
# Инициализируем список, если нет
if user_id not in self._messages:
self._messages[user_id] = []
# Удаляем старые записи за пределами окна
self._messages[user_id] = [
ts for ts in self._messages[user_id] if ts > window_start
]
# Проверяем, не превышен ли лимит
if len(self._messages[user_id]) >= self.max_messages:
return True
# Записываем текущее сообщение
self._messages[user_id].append(now)
return False
def reset(self, user_id: int):
"""Сброс счётчика для пользователя (например, при /start)"""
self._messages.pop(user_id, None)
# ==================== Логирование ====================
def logging_middleware(user_id: int, text: str) -> bool:
logger.info(f"Processed message from {user_id}: {text}")
return True
# ==================== Пример: мидлвар для аналитики ====================
def analytics_middleware(user_id: int, text: str) -> bool:
"""Заглушка для аналитики — можно расширить позже."""
logger.debug(f"Analytics: user {user_id} sent '{text}'")
return True