From 5feede6030dc6236144f874755f62fed812745c6 Mon Sep 17 00:00:00 2001 From: NCJOAQ <2627723488@qq.com> Date: Fri, 12 Dec 2025 11:19:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=A7=86=E9=A2=91=E9=95=BF?= =?UTF-8?q?=E5=BA=A6=E8=8E=B7=E5=8F=96=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 38 +++++-- manual_heartbeat.py | 268 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 296 insertions(+), 10 deletions(-) create mode 100644 manual_heartbeat.py diff --git a/main.py b/main.py index bf63ab2..f4cb322 100644 --- a/main.py +++ b/main.py @@ -23,7 +23,7 @@ progress_cache_file = "progress.json" # [配置] 心跳间隔(秒) # 警告:建议设置在 60 秒以上,过快的心跳可能面临风险! -HEARTBEAT_INTERVAL = 1 +HEARTBEAT_INTERVAL = 0 # [配置] 每次心跳增加的进度时间(秒) ADD_TIME = 120 @@ -161,6 +161,16 @@ def process_video(course_id, item_id): print(" [Error] XML 解析缺少关键字段 (userId/updStatusUrl/historyId)") return + # 读取本地缓存 + local_progress = load_progress().get(f"{course_id}_{item_id}", {}) + local_finished = local_progress.get("finished", False) + + # 优先检查是否已完成,如果已完成则跳过获取时长(避免 ffprobe) + if local_finished or finish_status == "true": + print( + f" [Skip] 视频已完成 (XML Finish: {finish_status}, Local Finish: {local_finished})") + return + # 获取时长 total_time = 0.0 if total_time_str: @@ -188,10 +198,7 @@ def process_video(course_id, item_id): # 5. 循环发送心跳 # 初始进度:优先从本地缓存读取,其次从 XML 读取 - # 读取本地缓存 - local_progress = load_progress().get(f"{course_id}_{item_id}", {}) local_time = local_progress.get("current_time", 0.0) - local_finished = local_progress.get("finished", False) # 读取 XML 进度 last_viewed_time_str = root.findtext("lastViewedTime") @@ -200,10 +207,10 @@ def process_video(course_id, item_id): # 选取最大的进度 current_time = max(local_time, xml_time) - # 如果本地标记已完成,或者 XML 标记已完成,或者进度已达标,则跳过 - if local_finished or finish_status == "true" or current_time >= total_time: + # 如果进度已达标,则跳过 + if current_time >= total_time: print( - f" [Skip] 视频已完成 (Progress: {current_time}/{total_time}, XML Finish: {finish_status})") + f" [Skip] 视频已完成 (Progress: {current_time}/{total_time})") return print(f" [Start] 当前进度: {current_time}s / {total_time}s") @@ -263,6 +270,17 @@ def send_heartbeat(url, user_id, course_id, sco_id, history_id, add_time, curren """ 发送单个心跳包,带重试机制 """ + has_check_three = "true" if first_update == "true" else "false" + + has_check_two = "false" + try: + if float(total_time) > 0 and float(current_time) > (float(total_time) / 2): + has_check_two = "true" + except: + pass + + has_check_one = "true" if str(finished) == "1" else "false" + payload = { "userId": user_id, "courseId": course_id, @@ -272,9 +290,9 @@ def send_heartbeat(url, user_id, course_id, sco_id, history_id, add_time, curren "totalTime": str(total_time), "finished": finished, "currentTime": str(current_time), - "hasCheckOne": "false", - "hasCheckTwo": "false", - "hasCheckThree": "false", + "hasCheckOne": has_check_one, + "hasCheckTwo": has_check_two, + "hasCheckThree": has_check_three, "firstUpdate": first_update } diff --git a/manual_heartbeat.py b/manual_heartbeat.py new file mode 100644 index 0000000..334b57d --- /dev/null +++ b/manual_heartbeat.py @@ -0,0 +1,268 @@ +import requests +import re +import time +import subprocess +import xml.etree.ElementTree as ET +import sys + +# ============================================================================== +# 配置区域 +# ============================================================================== + +# [配置] Cookie +# 请替换为你的登录 Cookie +COOKIE = "" + +# [配置] 目标课程和视频ID +# 如果留空,脚本运行时会提示输入 +TARGET_COURSE_ID = "" +TARGET_ITEM_ID = "" + +# [配置] 心跳间隔(秒) +HEARTBEAT_INTERVAL = 0 + +# [配置] 每次心跳增加的进度时间(秒) +ADD_TIME = 120 + +HEADERS = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Cookie": COOKIE +} + +# ============================================================================== +# 核心功能函数 +# ============================================================================== + + +def get_video_duration(video_url): + """ + 使用 ffprobe 获取视频时长 + """ + try: + cmd = [ + "ffprobe", + "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + video_url + ] + result = subprocess.run( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30) + if result.returncode == 0: + try: + duration = float(result.stdout.strip()) + return duration + except ValueError: + print(f" [Error] ffprobe output invalid: {result.stdout}") + return None + else: + print(f" [Error] ffprobe failed: {result.stderr}") + return None + except Exception as e: + print(f" [Error] get_video_duration exception: {e}") + return None + + +def send_heartbeat(url, user_id, course_id, sco_id, history_id, add_time, current_time, total_time, finished, first_update): + """ + 发送单个心跳包 + """ + has_check_three = "true" if first_update == "true" else "false" + has_check_two = "false" + try: + if float(total_time) > 0 and float(current_time) > (float(total_time) / 2): + has_check_two = "true" + except: + pass + + has_check_one = "true" if str(finished) == "1" else "false" + + payload = { + "userId": user_id, + "courseId": course_id, + "scoId": sco_id, + "historyId": history_id, + "addTime": str(add_time), + "totalTime": str(total_time), + "finished": finished, + "currentTime": str(current_time), + "hasCheckOne": has_check_one, + "hasCheckTwo": has_check_two, + "hasCheckThree": has_check_three, + "firstUpdate": first_update + } + + max_retries = 3 + for attempt in range(max_retries): + try: + resp = requests.post( + url, data=payload, headers=HEADERS, timeout=10) + resp.raise_for_status() + if "1" in resp.text: + return True + elif "-1" in resp.text: + print(" [Warning] 服务器返回 status -1 (可能多端登录或异常)") + return False + return True + except Exception as e: + print(f" [Warning] 请求异常 (尝试 {attempt+1}/{max_retries}): {e}") + if attempt < max_retries - 1: + time.sleep(2) + else: + return False + return False + + +def process_manual_video(course_id, item_id): + """ + 处理单个视频的主逻辑 + """ + print(f"\n>>> 开始处理: CourseId={course_id}, ItemId={item_id}") + + # 1. 请求视频页 + video_page_url = f"https://zjbc.cjnep.net/lms/web/course/view?id={course_id}&itemid={item_id}" + try: + print(" 正在获取视频页面信息...") + resp = requests.get(video_page_url, headers=HEADERS) + resp.raise_for_status() + except Exception as e: + print(f" [Error] 无法访问视频页: {e}") + return + + # 2. 提取 config URL + config_match = re.search(r"config:\s*'([^']+)'", resp.text) + if not config_match: + print(" [Error] 未找到 config URL,请检查 Cookie 是否过期或课程ID/ItemID是否正确。") + return + + config_path = config_match.group(1) + config_url = "https://zjbc.cjnep.net" + config_path + print(f" Config URL: {config_url}") + + # 3. 请求 XML 配置 + try: + xml_resp = requests.get(config_url, headers=HEADERS) + xml_resp.raise_for_status() + except Exception as e: + print(f" [Error] 无法获取 XML 配置: {e}") + return + + # 4. 解析 XML + try: + root = ET.fromstring(xml_resp.text) + user_id = root.findtext("userId") + upd_status_url = root.findtext("updStatusUrl") + total_time_str = root.findtext("totalTime") + history_id = root.findtext("historyId") + finish_status = root.findtext("finish") + + # 视频 URL (用于 ffprobe) + video_url = None + file_url_node = root.find("fileUrl") + if file_url_node is not None: + video_url = file_url_node.findtext("normal") + + if upd_status_url and not upd_status_url.startswith("http"): + upd_status_url = "https://zjbc.cjnep.net" + upd_status_url + + if not (user_id and upd_status_url and history_id): + print(" [Error] XML 解析缺少关键字段") + return + + # 检查是否已完成 + if finish_status == "true": + print(f" [Info] 服务器显示该视频已完成 (finish=true)。") + # 即使完成了,如果用户强制运行,也可以继续,但通常建议停止 + # 这里我们询问一下或者直接继续(为了刷时长?) + # 既然是手动脚本,我们假设用户知道自己在做什么,继续执行,但给个提示 + + # 获取时长 + total_time = 0.0 + if total_time_str: + total_time = float(total_time_str) + elif video_url: + print(" [Info] XML 中未找到 totalTime,尝试通过 ffprobe 获取...") + duration = get_video_duration(video_url) + if duration: + total_time = duration + print(f" [Info] 获取到视频时长: {total_time}s") + else: + print(" [Error] 无法获取视频时长,无法继续") + return + else: + print(" [Error] 无法确定视频总时长") + return + + print( + f" [Info] UserId={user_id}, HistoryId={history_id}, TotalTime={total_time}") + + except ET.ParseError: + print(" [Error] XML 解析失败") + return + + # 5. 循环发送心跳 + # 读取 XML 中的上次观看进度 + last_viewed_time_str = root.findtext("lastViewedTime") + current_time = float(last_viewed_time_str) if last_viewed_time_str else 0.0 + + print(f" [Start] 初始进度: {current_time}s / {total_time}s") + + is_first = "true" + + while True: + if current_time >= total_time: + print(" [Done] 进度已达到总时长。发送完成包...") + send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id, + add_time=0, current_time=0, total_time=total_time, + finished=1, first_update=is_first) + print(" [Success] 任务完成!") + break + + step = ADD_TIME + if current_time + step >= total_time: + step = int(total_time - current_time) + 1 + current_time = total_time + finished_flag = 1 + post_current_time = 0 + else: + current_time += step + finished_flag = 0 + post_current_time = current_time + + print( + f" [Sending] 发送心跳: 进度 {post_current_time if post_current_time > 0 else total_time}/{total_time} (add {step}s)...") + + success = send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id, + add_time=step, current_time=post_current_time, total_time=total_time, + finished=finished_flag, first_update=is_first) + + if not success: + print(" [Error] 心跳发送失败,停止。") + break + + if finished_flag == 1: + print(" [Done] 视频播放结束。") + break + + is_first = "false" + if HEARTBEAT_INTERVAL > 0: + time.sleep(HEARTBEAT_INTERVAL) + + +if __name__ == "__main__": + print("="*50) + print("手动刷课脚本") + print("="*50) + + c_id = TARGET_COURSE_ID + i_id = TARGET_ITEM_ID + + if not c_id: + c_id = input("请输入 Course ID (例如 677): ").strip() + if not i_id: + i_id = input("请输入 Item ID (例如 142573): ").strip() + + if c_id and i_id: + process_manual_video(c_id, i_id) + else: + print("输入无效,程序退出。")