import requests import re import time import subprocess import xml.etree.ElementTree as ET import sys # ============================================================================== # 配置区域 # ============================================================================== # [配置] Cookie # 请替换为你的登录 Cookie COOKIE = "" # [配置] 目标课程和视频ID # 如果留空,脚本运行时会提示输入 TARGET_COURSE_ID = "" TARGET_ITEM_ID = "" # [配置] 心跳间隔(秒) HEARTBEAT_INTERVAL = 0 # [配置] 每次心跳增加的进度时间(秒) ADD_TIME = 120 HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Cookie": COOKIE } # ============================================================================== # 核心功能函数 # ============================================================================== def get_video_duration(video_url): """ 使用 ffprobe 获取视频时长 """ try: cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_url ] result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30) if result.returncode == 0: try: duration = float(result.stdout.strip()) return duration except ValueError: print(f" [Error] ffprobe output invalid: {result.stdout}") return None else: print(f" [Error] ffprobe failed: {result.stderr}") return None except Exception as e: print(f" [Error] get_video_duration exception: {e}") return None def send_heartbeat(url, user_id, course_id, sco_id, history_id, add_time, current_time, total_time, finished, first_update): """ 发送单个心跳包 """ has_check_three = "true" if first_update == "true" else "false" has_check_two = "false" try: if float(total_time) > 0 and float(current_time) > (float(total_time) / 2): has_check_two = "true" except: pass has_check_one = "true" if str(finished) == "1" else "false" payload = { "userId": user_id, "courseId": course_id, "scoId": sco_id, "historyId": history_id, "addTime": str(add_time), "totalTime": str(total_time), "finished": finished, "currentTime": str(current_time), "hasCheckOne": has_check_one, "hasCheckTwo": has_check_two, "hasCheckThree": has_check_three, "firstUpdate": first_update } max_retries = 3 for attempt in range(max_retries): try: resp = requests.post( url, data=payload, headers=HEADERS, timeout=10) resp.raise_for_status() if "1" in resp.text: return True elif "-1" in resp.text: print(" [Warning] 服务器返回 status -1 (可能多端登录或异常)") return False return True except Exception as e: print(f" [Warning] 请求异常 (尝试 {attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: time.sleep(2) else: return False return False def process_manual_video(course_id, item_id): """ 处理单个视频的主逻辑 """ print(f"\n>>> 开始处理: CourseId={course_id}, ItemId={item_id}") # 1. 请求视频页 video_page_url = f"https://zjbc.cjnep.net/lms/web/course/view?id={course_id}&itemid={item_id}" try: print(" 正在获取视频页面信息...") resp = requests.get(video_page_url, headers=HEADERS) resp.raise_for_status() except Exception as e: print(f" [Error] 无法访问视频页: {e}") return # 2. 提取 config URL config_match = re.search(r"config:\s*'([^']+)'", resp.text) if not config_match: print(" [Error] 未找到 config URL,请检查 Cookie 是否过期或课程ID/ItemID是否正确。") return config_path = config_match.group(1) config_url = "https://zjbc.cjnep.net" + config_path print(f" Config URL: {config_url}") # 3. 请求 XML 配置 try: xml_resp = requests.get(config_url, headers=HEADERS) xml_resp.raise_for_status() except Exception as e: print(f" [Error] 无法获取 XML 配置: {e}") return # 4. 解析 XML try: root = ET.fromstring(xml_resp.text) user_id = root.findtext("userId") upd_status_url = root.findtext("updStatusUrl") total_time_str = root.findtext("totalTime") history_id = root.findtext("historyId") finish_status = root.findtext("finish") # 视频 URL (用于 ffprobe) video_url = None file_url_node = root.find("fileUrl") if file_url_node is not None: video_url = file_url_node.findtext("normal") if upd_status_url and not upd_status_url.startswith("http"): upd_status_url = "https://zjbc.cjnep.net" + upd_status_url if not (user_id and upd_status_url and history_id): print(" [Error] XML 解析缺少关键字段") return # 检查是否已完成 if finish_status == "true": print(f" [Info] 服务器显示该视频已完成 (finish=true)。") # 即使完成了,如果用户强制运行,也可以继续,但通常建议停止 # 这里我们询问一下或者直接继续(为了刷时长?) # 既然是手动脚本,我们假设用户知道自己在做什么,继续执行,但给个提示 # 获取时长 total_time = 0.0 if total_time_str: total_time = float(total_time_str) elif video_url: print(" [Info] XML 中未找到 totalTime,尝试通过 ffprobe 获取...") duration = get_video_duration(video_url) if duration: total_time = duration print(f" [Info] 获取到视频时长: {total_time}s") else: print(" [Error] 无法获取视频时长,无法继续") return else: print(" [Error] 无法确定视频总时长") return print( f" [Info] UserId={user_id}, HistoryId={history_id}, TotalTime={total_time}") except ET.ParseError: print(" [Error] XML 解析失败") return # 5. 循环发送心跳 # 读取 XML 中的上次观看进度 last_viewed_time_str = root.findtext("lastViewedTime") current_time = float(last_viewed_time_str) if last_viewed_time_str else 0.0 print(f" [Start] 初始进度: {current_time}s / {total_time}s") is_first = "true" while True: if current_time >= total_time: print(" [Done] 进度已达到总时长。发送完成包...") send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id, add_time=0, current_time=0, total_time=total_time, finished=1, first_update=is_first) print(" [Success] 任务完成!") break step = ADD_TIME if current_time + step >= total_time: step = int(total_time - current_time) + 1 current_time = total_time finished_flag = 1 post_current_time = 0 else: current_time += step finished_flag = 0 post_current_time = current_time print( f" [Sending] 发送心跳: 进度 {post_current_time if post_current_time > 0 else total_time}/{total_time} (add {step}s)...") success = send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id, add_time=step, current_time=post_current_time, total_time=total_time, finished=finished_flag, first_update=is_first) if not success: print(" [Error] 心跳发送失败,停止。") break if finished_flag == 1: print(" [Done] 视频播放结束。") break is_first = "false" if HEARTBEAT_INTERVAL > 0: time.sleep(HEARTBEAT_INTERVAL) if __name__ == "__main__": print("="*50) print("手动刷课脚本") print("="*50) c_id = TARGET_COURSE_ID i_id = TARGET_ITEM_ID if not c_id: c_id = input("请输入 Course ID (例如 677): ").strip() if not i_id: i_id = input("请输入 Item ID (例如 142573): ").strip() if c_id and i_id: process_manual_video(c_id, i_id) else: print("输入无效,程序退出。")