Init: 导入源码

This commit is contained in:
Kevin Wong
2026-01-09 09:33:32 +08:00
parent 8c9b52ffb6
commit f9e676900d
3 changed files with 576 additions and 0 deletions

370
ReplyBot.py Normal file
View File

@@ -0,0 +1,370 @@
import requests
import time
import json
import os
from openai import OpenAI
import tweepy
# API密钥配置
TWITTER_API_KEY = "3nt1jN4VvqUaaXGHv9AN5VsTV"
TWITTER_API_SECRET = "M2io73S7TzitFiBw825QIq8atyZRljbIDQuTpH39uFZanQ4XFh"
TWITTER_ACCESS_TOKEN = "1944636908-prxfjL6OIb56BQjuFTdChrUPh81OjmBbV7pfnWw"
TWITTER_ACCESS_SECRET = "D5AdCVRvIhGEmTmXA8hL5ciAUxIqNMZ3K3B3YejpqqNKj"
TWITTER_BEARER_TOKEN = "AAAAAAAAAAAAAAAAAAAAAGOd0gEAAAAALtv%2BzLsfGLLa5ydUt60ci6J5ce0%3DMBXViJ1NLY4XYdeuMq1xZQ98kHbeGK5lAJoV2j7Ssmcafk8Skn"
TWITTER_API_IO_KEY = "e3dad005b0e54bdc88c6178a89adec13"
DEEPSEEK_API_KEY = "sk-8a121704a9bc4ec6a5ab0ae16e0bc0ba"
# 创建DeepSeek客户端
deepseek_client = OpenAI(
api_key=DEEPSEEK_API_KEY,
base_url="https://api.deepseek.com"
)
# 已处理的回复文件
PROCESSED_REPLIES_FILE = "../../../processed_replies.json"
# OAuth 1.0a 认证
def get_oauth():
return tweepy.OAuth1UserHandler(
TWITTER_API_KEY,
TWITTER_API_SECRET,
TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_SECRET
)
# 使用高级搜索API获取推文
def advanced_search_tweets(query_type="Latest", keywords=None, username=None, max_results=10):
url = "https://api.twitterapi.io/twitter/tweet/advanced_search"
# 构建查询参数
params = {
"queryType": query_type,
"maxResults": max_results
}
# 添加可选参数
if keywords:
params["query"] = keywords
if username:
params["fromUsers"] = username
headers = {
"X-API-Key": TWITTER_API_IO_KEY
}
print(f"发送高级搜索请求到 TwitterAPI.io:")
print(f"URL: {url}")
print(f"参数: {params}")
print(f"Headers: {{'X-API-Key': '****' + TWITTER_API_IO_KEY[-4:]}}")
response = requests.get(url, headers=headers, params=params)
print(f"响应状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f"响应数据结构: {list(data.keys() if isinstance(data, dict) else ['非字典'])}")
# 检查并提取推文
tweets = []
if isinstance(data, dict):
if "tweets" in data:
tweets = data["tweets"]
elif "data" in data:
tweets = data["data"]
print(f"找到 {len(tweets)} 条推文")
return data
else:
print(f"高级搜索请求失败: {response.status_code}")
print(response.text[:500]) # 只打印前500个字符避免过长
return None
# 获取用户最新推文 - 使用TwitterAPI.io的user/last_tweets端点
def get_user_last_tweets(username, count=5):
url = "https://api.twitterapi.io/twitter/user/last_tweets"
params = {
"userName": username,
"count": count
}
headers = {
"X-API-Key": TWITTER_API_IO_KEY
}
print(f"发送请求到 TwitterAPI.io 获取用户推文:")
print(f"URL: {url}")
print(f"参数: {params}")
print(f"Headers: {{'X-API-Key': '****' + TWITTER_API_IO_KEY[-4:]}}")
try:
response = requests.get(url, headers=headers, params=params)
print(f"响应状态码: {response.status_code}")
print(f"响应前100个字符: {response.text[:100]}")
if response.status_code == 200:
data = response.json()
if "status" in data and data["status"] == "success":
# 检查是否有推文数据
if "data" in data and "tweets" in data["data"] and len(data["data"]["tweets"]) > 0:
print(f"找到 {len(data['data']['tweets'])} 条推文")
# 把推文数据调整为与原代码兼容的格式
return {"tweets": data["data"]["tweets"]}
else:
print("API返回成功但没有推文数据")
print(f"响应键: {list(data.keys())}")
if "data" in data:
print(f"data键: {list(data['data'].keys())}")
return None
else:
print(f"API返回非成功状态: {data.get('status', 'unknown')}")
return None
else:
print(f"获取推文失败: {response.status_code}")
print(response.text[:500]) # 只打印前500个字符
return None
except Exception as e:
print(f"获取用户推文时出错: {str(e)}")
print("尝试使用高级搜索API作为备选方案...")
# 作为备选方案使用高级搜索API
return advanced_search_tweets(
query_type="Latest",
username=username,
max_results=count
)
# 获取推文回复
def get_tweet_replies(tweet_id, count=10):
url = "https://api.twitterapi.io/twitter/tweet/replies"
params = {
"tweetId": tweet_id,
"count": count
}
headers = {
"X-API-Key": TWITTER_API_IO_KEY
}
print(f"获取推文 {tweet_id} 的回复...")
print(f"URL: {url}")
print(f"参数: {params}")
response = requests.get(url, headers=headers, params=params)
print(f"响应状态码: {response.status_code}")
if response.status_code == 200:
reply_data = response.json()
print(f"回复数据结构: {list(reply_data.keys() if isinstance(reply_data, dict) else ['非字典'])}")
if isinstance(reply_data, dict) and "tweets" in reply_data:
print(f"找到 {len(reply_data['tweets'])} 条回复")
return reply_data
else:
print(f"获取回复失败: {response.status_code}")
print(response.text[:500]) # 只打印前500个字符
return None
# 使用DeepSeek生成回复内容
def generate_reply(tweet_content, reply_content):
prompt = f"""
我收到了以下推文和回复:
我的推文: "{tweet_content}"
用户的回复: "{reply_content}"
请生成一个友好且相关的回复不超过200个字符。回复应该表达感谢并继续对话。不要包含字符数或其他元信息。
"""
try:
response = deepseek_client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "你是一个友好的社交媒体助手,擅长简短而有趣的回复。"},
{"role": "user", "content": prompt},
],
stream=False
)
reply = response.choices[0].message.content.strip()
print(f"生成的回复: {reply}")
return reply[:200] # 确保不超过200字符
except Exception as e:
print(f"生成回复失败: {e}")
return "谢谢你的回复!"
# 获取自身user_id
def get_my_user_id():
user = tweepy.Client(
consumer_key=TWITTER_API_KEY,
consumer_secret=TWITTER_API_SECRET,
access_token=TWITTER_ACCESS_TOKEN,
access_token_secret=TWITTER_ACCESS_SECRET
).get_me()
return user.data.id
# 点赞推文
def like_tweet(tweet_id):
try:
tweepy.Client(
consumer_key=TWITTER_API_KEY,
consumer_secret=TWITTER_API_SECRET,
access_token=TWITTER_ACCESS_TOKEN,
access_token_secret=TWITTER_ACCESS_SECRET
).like(tweet_id)
print(f"成功点赞推文: {tweet_id}")
return True
except Exception as e:
print(f"点赞失败: {e}")
return False
# 回复推文
def reply_to_tweet(tweet_id, reply_text):
try:
tweepy.Client(
consumer_key=TWITTER_API_KEY,
consumer_secret=TWITTER_API_SECRET,
access_token=TWITTER_ACCESS_TOKEN,
access_token_secret=TWITTER_ACCESS_SECRET
).create_tweet(in_reply_to_tweet_id=tweet_id, text=reply_text)
print(f"成功回复: {tweet_id}")
return True
except Exception as e:
print(f"回复失败: {e}")
return False
# 加载已处理的回复
def load_processed_replies():
try:
if os.path.exists(PROCESSED_REPLIES_FILE):
with open(PROCESSED_REPLIES_FILE, "r") as f:
data = json.load(f)
return data
return {}
except Exception as e:
print(f"加载已处理回复时出错: {e}")
return {}
# 保存已处理的回复
def save_processed_replies(processed_replies_dict):
try:
with open(PROCESSED_REPLIES_FILE, "w") as f:
json.dump(processed_replies_dict, f)
print(f"成功保存已处理的回复到 {PROCESSED_REPLIES_FILE}")
except Exception as e:
print(f"保存已处理回复时出错: {e}")
# 处理回复函数
def process_replies(replies, tweet_content, your_username, tweet_id, processed_replies_dict, my_user_id):
if tweet_id not in processed_replies_dict:
processed_replies_dict[tweet_id] = []
processed_replies = set(processed_replies_dict[tweet_id])
print("检查回复数据结构...")
print(f"回复类型: {type(replies)}")
# 尝试提取回复列表
reply_items = []
if isinstance(replies, dict):
if "tweets" in replies and isinstance(replies["tweets"], list):
reply_items = replies["tweets"]
print(f"使用 tweets 字段,找到 {len(reply_items)} 条回复")
elif "data" in replies:
data = replies["data"]
if isinstance(data, list):
reply_items = data
print(f"使用 data 字段,找到 {len(reply_items)} 条回复")
if not reply_items:
print("未能找到回复项")
return
for reply in reply_items:
try:
# 提取回复信息
reply_id = reply.get("id") or reply.get("id_str")
if not reply_id:
print("无法获取回复ID跳过")
continue
reply_content = reply.get("text", "") or reply.get("full_text", "") or reply.get("content", "")
# 提取作者信息
reply_author = None
if "user" in reply and isinstance(reply["user"], dict):
reply_author = reply["user"].get("username") or reply["user"].get("screen_name")
elif "username" in reply:
reply_author = reply["username"]
if not reply_author:
print(f"无法获取回复作者,使用默认值")
reply_author = "unknown_user"
print(f"\n处理回复: ID={reply_id}, 作者={reply_author}")
print(f"回复内容: {reply_content}")
# 跳过自己的回复或已处理的回复
if reply_author.lower() == your_username.lower():
print("跳过自己的回复")
continue
if reply_id in processed_replies:
print("跳过已处理的回复")
continue
# 点赞回复
like_result = like_tweet(reply_id)
# 生成回复内容
generated_reply = generate_reply(tweet_content, reply_content)
# 回复推文
reply_result = reply_to_tweet(reply_id, generated_reply)
if like_result and reply_result:
processed_replies.add(reply_id)
processed_replies_dict[tweet_id] = list(processed_replies)
save_processed_replies(processed_replies_dict)
time.sleep(2) # 点赞/回复间隔
except Exception as e:
print(f"处理回复时出错: {e}")
continue
# 主函数
def main():
print("Twitter留言点赞+回复机器人启动 [Tweepy版]")
processed_replies_dict = load_processed_replies()
your_username = "GWAaiagent" # 修改为你的Twitter用户名
my_user_id = get_my_user_id()
if not my_user_id:
print("无法获取自身user_id程序退出")
return
tweets_data = get_user_last_tweets(your_username, count=5)
if not tweets_data or "tweets" not in tweets_data or not tweets_data["tweets"]:
print("无法获取最近推文")
return
# 只处理最多5条推文防止API超量返回
if 'tweets' in tweets_data and len(tweets_data['tweets']) > 5:
tweets_data['tweets'] = tweets_data['tweets'][:5]
for tweet in tweets_data["tweets"]:
tweet_id = tweet["id"]
tweet_content = tweet.get("text", "") or tweet.get("full_text", "")
if not tweet_content:
continue
print(f"\n处理推文: {tweet_id}")
replies = get_tweet_replies(tweet_id)
if replies:
process_replies(replies, tweet_content, your_username, tweet_id, processed_replies_dict, my_user_id)
else:
print(f"推文 {tweet_id} 没有留言")
time.sleep(3) # 推文处理间隔
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"程序异常: {e}")