1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| import time from datetime import datetime, timezone, timedelta import requests import logging from rich.logging import RichHandler from rich.status import Status
INTERVAL = 2 FORMAT = "%(message)s" WEBHOOK_KEY = '' API_URL = 'http://网站或者ip/api/game/1/notices'
TEMPLATES = { 'Normal': ' 比赛公告\n内容: %s\n时间: %s', 'NewChallenge': ' 新增题目\n题目名称: %s\n发布时间: %s', 'NewHint': ' 题目提示\n题目名称: %s 有新提示\n更新时间: %s', 'FirstBlood': ' 一血播报\n选手: %s\n攻破题目: %s\n时间: %s', 'SecondBlood': '⚔️ 二血播报\n选手: %s\n攻破题目: %s\n时间: %s', 'ThirdBlood': '⚡ 三血播报\n选手: %s\n攻破题目: %s\n时间: %s' }
class WXWorkBot: """企业微信机器人封装类"""
def __init__(self, webhook_key: str): self.webhook_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={webhook_key}"
def send_markdown(self, content: str) -> bool: """发送Markdown格式消息""" payload = { "msgtype": "text", "text": { "content": content } } try: response = requests.post(self.webhook_url, json=payload) response.raise_for_status() return True except Exception as e: logging.error(f"消息发送失败: {str(e)}") return False
def process_timestamp(ts: int) -> str: """处理毫秒级时间戳转换""" utc_time = datetime.utcfromtimestamp(ts / 1000).replace(tzinfo=timezone.utc) beijing_time = utc_time.astimezone(timezone(timedelta(hours=8))) return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format=FORMAT, datefmt="[%X]", handlers=[RichHandler()] ) log = logging.getLogger("rich") wx_bot = WXWorkBot(WEBHOOK_KEY) NOW_ID = 0
try: response = requests.get(API_URL) notices = sorted(response.json(), key=lambda x: x['id']) NOW_ID = notices[-1]['id'] if notices else 0 log.info(f"初始最新通知ID: {NOW_ID}") except Exception as e: log.error(f"初始化失败: {str(e)}") exit(1)
status = Status('Waiting for new notice') status.start()
while True: try: response = requests.get(API_URL) notices = sorted(response.json(), key=lambda x: x['id'])
for notice in notices: if notice['id'] > NOW_ID: try: timestamp = notice['time'] if isinstance(notice['time'], int) else int(notice['time']) formatted_time = process_timestamp(timestamp)
values = notice['values'] if not isinstance(values, list): values = [str(values)]
required_params = TEMPLATES[notice['type']].count('%s') - 1 if len(values) < required_params: log.warning(f"参数不足的通知: ID={notice['id']} Type={notice['type']}") continue
template_args = values[:required_params] + [formatted_time]
message = TEMPLATES[notice['type']] % tuple(template_args)
if wx_bot.send_markdown(message): NOW_ID = notice['id'] log.info(f"成功发送通知 ID: {notice['id']}") except KeyError as e: log.error(f"字段缺失: {str(e)} 通知内容: {notice}") except Exception as e: log.error(f"处理通知异常: {str(e)} 通知内容: {notice}")
time.sleep(INTERVAL)
except KeyboardInterrupt: log.info('程序已手动终止') break except requests.RequestException as e: log.error(f"API请求异常: {str(e)}") time.sleep(5) except Exception as e: log.error(f"未知错误: {str(e)}") time.sleep(5)
status.stop()
|