Files
QuickBrush/manual_heartbeat.py
2025-12-12 11:19:14 +08:00

269 lines
8.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import requests
import re
import time
import subprocess
import xml.etree.ElementTree as ET
import sys
# ==============================================================================
# 配置区域
# ==============================================================================
# [配置] Cookie
# 请替换为你的登录 Cookie
COOKIE = ""
# [配置] 目标课程和视频ID
# 如果留空,脚本运行时会提示输入
TARGET_COURSE_ID = ""
TARGET_ITEM_ID = ""
# [配置] 心跳间隔(秒)
HEARTBEAT_INTERVAL = 0
# [配置] 每次心跳增加的进度时间(秒)
ADD_TIME = 120
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Cookie": COOKIE
}
# ==============================================================================
# 核心功能函数
# ==============================================================================
def get_video_duration(video_url):
"""
使用 ffprobe 获取视频时长
"""
try:
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_url
]
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30)
if result.returncode == 0:
try:
duration = float(result.stdout.strip())
return duration
except ValueError:
print(f" [Error] ffprobe output invalid: {result.stdout}")
return None
else:
print(f" [Error] ffprobe failed: {result.stderr}")
return None
except Exception as e:
print(f" [Error] get_video_duration exception: {e}")
return None
def send_heartbeat(url, user_id, course_id, sco_id, history_id, add_time, current_time, total_time, finished, first_update):
"""
发送单个心跳包
"""
has_check_three = "true" if first_update == "true" else "false"
has_check_two = "false"
try:
if float(total_time) > 0 and float(current_time) > (float(total_time) / 2):
has_check_two = "true"
except:
pass
has_check_one = "true" if str(finished) == "1" else "false"
payload = {
"userId": user_id,
"courseId": course_id,
"scoId": sco_id,
"historyId": history_id,
"addTime": str(add_time),
"totalTime": str(total_time),
"finished": finished,
"currentTime": str(current_time),
"hasCheckOne": has_check_one,
"hasCheckTwo": has_check_two,
"hasCheckThree": has_check_three,
"firstUpdate": first_update
}
max_retries = 3
for attempt in range(max_retries):
try:
resp = requests.post(
url, data=payload, headers=HEADERS, timeout=10)
resp.raise_for_status()
if "<status>1</status>" in resp.text:
return True
elif "<status>-1</status>" in resp.text:
print(" [Warning] 服务器返回 status -1 (可能多端登录或异常)")
return False
return True
except Exception as e:
print(f" [Warning] 请求异常 (尝试 {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(2)
else:
return False
return False
def process_manual_video(course_id, item_id):
"""
处理单个视频的主逻辑
"""
print(f"\n>>> 开始处理: CourseId={course_id}, ItemId={item_id}")
# 1. 请求视频页
video_page_url = f"https://zjbc.cjnep.net/lms/web/course/view?id={course_id}&itemid={item_id}"
try:
print(" 正在获取视频页面信息...")
resp = requests.get(video_page_url, headers=HEADERS)
resp.raise_for_status()
except Exception as e:
print(f" [Error] 无法访问视频页: {e}")
return
# 2. 提取 config URL
config_match = re.search(r"config:\s*'([^']+)'", resp.text)
if not config_match:
print(" [Error] 未找到 config URL请检查 Cookie 是否过期或课程ID/ItemID是否正确。")
return
config_path = config_match.group(1)
config_url = "https://zjbc.cjnep.net" + config_path
print(f" Config URL: {config_url}")
# 3. 请求 XML 配置
try:
xml_resp = requests.get(config_url, headers=HEADERS)
xml_resp.raise_for_status()
except Exception as e:
print(f" [Error] 无法获取 XML 配置: {e}")
return
# 4. 解析 XML
try:
root = ET.fromstring(xml_resp.text)
user_id = root.findtext("userId")
upd_status_url = root.findtext("updStatusUrl")
total_time_str = root.findtext("totalTime")
history_id = root.findtext("historyId")
finish_status = root.findtext("finish")
# 视频 URL (用于 ffprobe)
video_url = None
file_url_node = root.find("fileUrl")
if file_url_node is not None:
video_url = file_url_node.findtext("normal")
if upd_status_url and not upd_status_url.startswith("http"):
upd_status_url = "https://zjbc.cjnep.net" + upd_status_url
if not (user_id and upd_status_url and history_id):
print(" [Error] XML 解析缺少关键字段")
return
# 检查是否已完成
if finish_status == "true":
print(f" [Info] 服务器显示该视频已完成 (finish=true)。")
# 即使完成了,如果用户强制运行,也可以继续,但通常建议停止
# 这里我们询问一下或者直接继续(为了刷时长?)
# 既然是手动脚本,我们假设用户知道自己在做什么,继续执行,但给个提示
# 获取时长
total_time = 0.0
if total_time_str:
total_time = float(total_time_str)
elif video_url:
print(" [Info] XML 中未找到 totalTime尝试通过 ffprobe 获取...")
duration = get_video_duration(video_url)
if duration:
total_time = duration
print(f" [Info] 获取到视频时长: {total_time}s")
else:
print(" [Error] 无法获取视频时长,无法继续")
return
else:
print(" [Error] 无法确定视频总时长")
return
print(
f" [Info] UserId={user_id}, HistoryId={history_id}, TotalTime={total_time}")
except ET.ParseError:
print(" [Error] XML 解析失败")
return
# 5. 循环发送心跳
# 读取 XML 中的上次观看进度
last_viewed_time_str = root.findtext("lastViewedTime")
current_time = float(last_viewed_time_str) if last_viewed_time_str else 0.0
print(f" [Start] 初始进度: {current_time}s / {total_time}s")
is_first = "true"
while True:
if current_time >= total_time:
print(" [Done] 进度已达到总时长。发送完成包...")
send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
add_time=0, current_time=0, total_time=total_time,
finished=1, first_update=is_first)
print(" [Success] 任务完成!")
break
step = ADD_TIME
if current_time + step >= total_time:
step = int(total_time - current_time) + 1
current_time = total_time
finished_flag = 1
post_current_time = 0
else:
current_time += step
finished_flag = 0
post_current_time = current_time
print(
f" [Sending] 发送心跳: 进度 {post_current_time if post_current_time > 0 else total_time}/{total_time} (add {step}s)...")
success = send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
add_time=step, current_time=post_current_time, total_time=total_time,
finished=finished_flag, first_update=is_first)
if not success:
print(" [Error] 心跳发送失败,停止。")
break
if finished_flag == 1:
print(" [Done] 视频播放结束。")
break
is_first = "false"
if HEARTBEAT_INTERVAL > 0:
time.sleep(HEARTBEAT_INTERVAL)
if __name__ == "__main__":
print("="*50)
print("手动刷课脚本")
print("="*50)
c_id = TARGET_COURSE_ID
i_id = TARGET_ITEM_ID
if not c_id:
c_id = input("请输入 Course ID (例如 677): ").strip()
if not i_id:
i_id = input("请输入 Item ID (例如 142573): ").strip()
if c_id and i_id:
process_manual_video(c_id, i_id)
else:
print("输入无效,程序退出。")