下面的代码来自给晓冰做 工商信息截图 时制作的 bot。
当时使用的 python-telegram-bot
库是 1.11 版,但是 1.12 已经进入了 beta。1.12 对 API 做了 比较多的调整。下面的代码可能不如官方文档有参考价值。
同时注意下代码中使用 socks5h://
的原因。
Bot Service
import logging
import requests
from celery import chain
from requests import RequestException
from telegram import InlineKeyboardMarkup, InlineKeyboardButton
from telegram.ext import Updater, CommandHandler, Filters, MessageHandler, CallbackQueryHandler
import utils
from tasks import capture, send_mail, on_error
from utils import TELEGRAM_TOKEN
REQUEST_KWARGS = {
'proxy_url': 'socks5h://127.0.0.1:1080',
}
logger = logging.getLogger('telegram_app')
updater = Updater(token=TELEGRAM_TOKEN, request_kwargs=REQUEST_KWARGS)
dispatcher = updater.dispatcher
queue = updater.job_queue
def start(bot, update):
markup = InlineKeyboardMarkup(
[[InlineKeyboardButton('截工商信息', callback_data='screenshot_instruction')]]
)
bot.send_message(chat_id=update.message.chat_id, text="欢迎使用 🌈⛈🎉🌹🐧😊", reply_markup=markup)
def test(bot, update, args):
company_name = ''.join(args).strip()
(capture.si(company_name, update.message.chat_id))()
def screenshot_instruction(bot, update):
query = update.callback_query
bot.edit_message_text(text="请直接输入公司名称🐧",
chat_id=query.message.chat_id,
message_id=query.message.message_id)
def kiss(bot, update):
query = update.callback_query
bot.edit_message_text(text="😘😘😘",
chat_id=query.message.chat_id,
message_id=query.message.message_id)
def screenshot(bot, update):
company_name = update.message.text.strip()
s = requests.Session()
ua_headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"}
# 种 Cookie
r = s.get('https://app02.szmqs.gov.cn/outer/entSelect/gs.html', headers=ua_headers)
r.raise_for_status()
# 查公司存不存在
url = 'https://app02.szmqs.gov.cn/outer/entEnt/detail.do'
headers = dict(
Origin='https://app02.szmqs.gov.cn',
Referer='https://app02.szmqs.gov.cn/outer/entSelect/gs.html',
**ua_headers
)
data = {'unifsocicrediden': '', 'entname': company_name, 'flag': 1}
r = s.post(url, headers=headers, data=data)
chat_id = update.message.chat_id
try:
r.raise_for_status()
# 不存在时返回 {"data":[{"data":[],"name":"data","vtype":"attr"}]}
if not r.json()['data'][0]['data']:
bot.send_message(chat_id=chat_id, text="公司不存在,请确定输入是否正确 😞")
else:
chain(
capture.si(company_name, chat_id).on_error(on_error.s()),
send_mail.si(company_name, chat_id).on_error(on_error.s())
)()
bot.send_message(chat_id=chat_id, text="截图抓取中,稍后以邮件发送结果🐧")
except (RequestException, KeyError) as e:
logger.exception(e)
bot.send_message(chat_id=chat_id, text="查询公司失败,请重试 😞")
def main():
dispatcher.add_handler(CommandHandler('start', start))
dispatcher.add_handler(CommandHandler('test', test, pass_args=True))
dispatcher.add_handler(CallbackQueryHandler(screenshot_instruction, pattern='screenshot_instruction'))
dispatcher.add_handler(CallbackQueryHandler(kiss, pattern='heart'))
dispatcher.add_handler(MessageHandler(Filters.text, screenshot))
updater.start_polling()
if __name__ == '__main__':
utils.setup_logging('telegram_app')
main()
Sending Message Script
import telegram
from telegram.utils.request import Request
# bot 发消息的脚本
bot = telegram.Bot(token='token', request=Request(proxy_url='socks5h://user:pass@host:port'))
bot.send_message(chat_id='chat_id', text='hello')