Files
weather_bot/bot.py
2026-04-02 09:53:14 +03:00

241 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import logging
import os
from datetime import datetime
from aiogram.exceptions import TelegramForbiddenError
from aiogram import Bot, Dispatcher, F, types
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.types import ReplyKeyboardMarkup, KeyboardButton, ReplyKeyboardRemove
from aiogram.client.default import DefaultBotProperties
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from dotenv import load_dotenv
from weather import (
get_daily_weather_summary,
get_coords_by_city,
get_tomorrow_weather_summary,
get_weekly_weather_summary,
get_timezone_by_coords
)
import database as db # Импортируем нашу базу данных
load_dotenv()
BOT_TOKEN = os.getenv("BOT_TOKEN")
logging.basicConfig(level=logging.INFO)
bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode="Markdown"))
dp = Dispatcher()
scheduler = AsyncIOScheduler()
class WeatherSetup(StatesGroup):
waiting_for_location = State()
waiting_for_time = State()
def get_main_keyboard():
return ReplyKeyboardMarkup(
keyboard=[
[KeyboardButton(text="🌤 Погода сейчас")],
[
KeyboardButton(text="📅 Погода на завтра"),
KeyboardButton(text="📆 Прогноз на неделю")
],
[
KeyboardButton(text="📍 Изменить город"),
KeyboardButton(text="⏰ Изменить время")
]
],
resize_keyboard=True
)
def get_location_keyboard():
return ReplyKeyboardMarkup(
keyboard=[[KeyboardButton(text="📍 Отправить текущую геопозицию", request_location=True)]],
resize_keyboard=True,
input_field_placeholder="Или напишите название города..."
)
async def send_weather_update(user_id: int):
"""Отправка погоды по расписанию"""
user = await db.get_user(user_id)
if not user or not user["lat"]:
return
weather_text = await get_daily_weather_summary(user["lat"], user["lon"])
try:
await bot.send_message(chat_id=user_id, text=weather_text)
except TelegramForbiddenError:
logging.info(f"Пользователь {user_id} заблокировал бота. Удаляем из рассылки.")
scheduler.remove_job(str(user_id))
await db.delete_user(user_id)
except Exception as e:
logging.error(f"Ошибка отправки пользователю {user_id}: {e}")
@dp.message(Command("start"))
async def cmd_start(message: types.Message, state: FSMContext):
await state.set_state(WeatherSetup.waiting_for_location)
await message.answer(
"Привет! Я погодный бот.\n\nОтправь мне свою геопозицию кнопкой ниже или **просто напиши название своего города**:",
reply_markup=get_location_keyboard()
)
@dp.message(F.text == "📍 Изменить город")
async def cmd_change_city(message: types.Message, state: FSMContext):
await state.set_state(WeatherSetup.waiting_for_location)
await message.answer("Напиши название нового города или отправь геопозицию:", reply_markup=get_location_keyboard())
@dp.message(WeatherSetup.waiting_for_location)
async def process_location_input(message: types.Message, state: FSMContext):
user_id = message.from_user.id
city_name_display = "Выбранная локация"
timezone = None
if message.location:
lat = message.location.latitude
lon = message.location.longitude
# Для координат нужно определить часовой пояс отдельным запросом
timezone = await get_timezone_by_coords(lat, lon)
if not timezone:
await message.answer("Не удалось определить часовой пояс для вашей геопозиции. Попробуйте ввести название города.")
return
elif message.text:
lat, lon, found_name, timezone = await get_coords_by_city(message.text)
if not lat:
await message.answer("К сожалению, я не нашел такой город 😔. Попробуй уточнить название.")
return
city_name_display = found_name
else:
return
# Сохраняем локацию и часовой пояс в БД
await db.save_user_location(user_id, lat, lon, timezone)
logging.info(f"Сохранена локация для {user_id}: lat={lat}, lon={lon}, timezone={timezone}")
# Проверяем, есть ли уже у пользователя настроенное время
user = await db.get_user(user_id)
if user and user["time"]:
await message.answer(f"📍 Локация успешно обновлена на: **{city_name_display}**!", reply_markup=get_main_keyboard())
await state.clear()
else:
await state.set_state(WeatherSetup.waiting_for_time)
await message.answer(
f"Отлично! Локация **{city_name_display}** найдена (часовой пояс: {timezone}).\n\nТеперь напиши время для ежедневной рассылки (в формате ЧЧ:ММ):",
reply_markup=ReplyKeyboardRemove()
)
@dp.message(F.text == "⏰ Изменить время")
async def cmd_change_time(message: types.Message, state: FSMContext):
user = await db.get_user(message.from_user.id)
if not user:
await message.answer("Сначала отправьте локацию через команду /start.")
return
await state.set_state(WeatherSetup.waiting_for_time)
await message.answer("Введите новое время для ежедневного прогноза (в формате ЧЧ:ММ):", reply_markup=ReplyKeyboardRemove())
@dp.message(WeatherSetup.waiting_for_time, F.text)
async def process_time(message: types.Message, state: FSMContext):
time_text = message.text.strip()
user_id = message.from_user.id
try:
dt = datetime.strptime(time_text, "%H:%M")
# Получаем пользователя, чтобы узнать его часовой пояс
user = await db.get_user(user_id)
if not user or not user["timezone"]:
await message.answer("Не удалось найти ваш часовой пояс. Пожалуйста, установите город заново (команда /start).")
return
# Сохраняем время в БД
await db.update_user_time(user_id, time_text)
# Обновляем или создаем задачу. id задачи = строковый user_id.
# replace_existing=True автоматически заменит старую задачу, если она была.
scheduler.add_job(
send_weather_update,
trigger='cron',
hour=dt.hour, minute=dt.minute,
timezone=user["timezone"],
args=[user_id],
id=str(user_id),
replace_existing=True
)
await message.answer(f"✅ Готово! Ежедневный прогноз установлен на {time_text}.", reply_markup=get_main_keyboard())
await state.clear()
except ValueError:
await message.answer("Неверный формат времени. Пожалуйста, используй формат ЧЧ:ММ.")
@dp.message(F.text == "🌤 Погода сейчас")
async def cmd_weather_now(message: types.Message):
user = await db.get_user(message.from_user.id)
if not user or not user["lat"]:
await message.answer("Сначала отправьте локацию через команду /start.")
return
loading_msg = await message.answer("⏳ Узнаю погоду...")
weather_text = await get_daily_weather_summary(user["lat"], user["lon"])
await loading_msg.edit_text(weather_text)
@dp.message(F.text == "📅 Погода на завтра")
async def cmd_weather_tomorrow(message: types.Message):
user = await db.get_user(message.from_user.id)
if not user or not user["lat"]:
await message.answer("Сначала отправьте локацию через команду /start.")
return
loading_msg = await message.answer("⏳ Узнаю прогноз на завтра...")
weather_text = await get_tomorrow_weather_summary(user["lat"], user["lon"])
await loading_msg.edit_text(weather_text)
@dp.message(F.text == "📆 Прогноз на неделю")
async def cmd_weather_week(message: types.Message):
user = await db.get_user(message.from_user.id)
if not user or not user["lat"]:
await message.answer("Сначала отправьте локацию через команду /start.")
return
loading_msg = await message.answer("⏳ Собираю прогноз на 7 дней...")
weather_text = await get_weekly_weather_summary(user["lat"], user["lon"])
await loading_msg.edit_text(weather_text)
# --- ВОССТАНОВЛЕНИЕ ПЛАНИРОВЩИКА ---
async def restore_jobs():
"""Загружает задачи из БД в планировщик после перезапуска бота"""
users = await db.get_all_users_with_time()
count = 0
for user in users:
try:
dt = datetime.strptime(user["time"], "%H:%M")
scheduler.add_job(
send_weather_update,
trigger='cron',
hour=dt.hour, minute=dt.minute,
timezone=user["timezone"],
args=[user["user_id"]],
id=str(user["user_id"]),
replace_existing=True
)
count += 1
except Exception as e:
logging.error(f"Не удалось восстановить задачу для {user['user_id']}: {e}")
logging.info(f"Восстановлено задач из БД: {count}")
async def main():
await db.init_db() # 1. Инициализируем БД
await restore_jobs() # 2. Восстанавливаем расписание из БД
scheduler.start() # 3. Запускаем планировщик
await bot.delete_webhook(drop_pending_updates=True)
await dp.start_polling(bot) # 4. Запускаем бота
if __name__ == "__main__":
asyncio.run(main())