Files
AI-Tweets/ReplyBot.py
2026-01-09 09:33:32 +08:00

370 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import requests
import time
import json
import os
from openai import OpenAI
import tweepy
# API密钥配置
TWITTER_API_KEY = "3nt1jN4VvqUaaXGHv9AN5VsTV"
TWITTER_API_SECRET = "M2io73S7TzitFiBw825QIq8atyZRljbIDQuTpH39uFZanQ4XFh"
TWITTER_ACCESS_TOKEN = "1944636908-prxfjL6OIb56BQjuFTdChrUPh81OjmBbV7pfnWw"
TWITTER_ACCESS_SECRET = "D5AdCVRvIhGEmTmXA8hL5ciAUxIqNMZ3K3B3YejpqqNKj"
TWITTER_BEARER_TOKEN = "AAAAAAAAAAAAAAAAAAAAAGOd0gEAAAAALtv%2BzLsfGLLa5ydUt60ci6J5ce0%3DMBXViJ1NLY4XYdeuMq1xZQ98kHbeGK5lAJoV2j7Ssmcafk8Skn"
TWITTER_API_IO_KEY = "e3dad005b0e54bdc88c6178a89adec13"
DEEPSEEK_API_KEY = "sk-8a121704a9bc4ec6a5ab0ae16e0bc0ba"
# 创建DeepSeek客户端
deepseek_client = OpenAI(
api_key=DEEPSEEK_API_KEY,
base_url="https://api.deepseek.com"
)
# 已处理的回复文件
PROCESSED_REPLIES_FILE = "../../../processed_replies.json"
# OAuth 1.0a 认证
def get_oauth():
return tweepy.OAuth1UserHandler(
TWITTER_API_KEY,
TWITTER_API_SECRET,
TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_SECRET
)
# 使用高级搜索API获取推文
def advanced_search_tweets(query_type="Latest", keywords=None, username=None, max_results=10):
url = "https://api.twitterapi.io/twitter/tweet/advanced_search"
# 构建查询参数
params = {
"queryType": query_type,
"maxResults": max_results
}
# 添加可选参数
if keywords:
params["query"] = keywords
if username:
params["fromUsers"] = username
headers = {
"X-API-Key": TWITTER_API_IO_KEY
}
print(f"发送高级搜索请求到 TwitterAPI.io:")
print(f"URL: {url}")
print(f"参数: {params}")
print(f"Headers: {{'X-API-Key': '****' + TWITTER_API_IO_KEY[-4:]}}")
response = requests.get(url, headers=headers, params=params)
print(f"响应状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f"响应数据结构: {list(data.keys() if isinstance(data, dict) else ['非字典'])}")
# 检查并提取推文
tweets = []
if isinstance(data, dict):
if "tweets" in data:
tweets = data["tweets"]
elif "data" in data:
tweets = data["data"]
print(f"找到 {len(tweets)} 条推文")
return data
else:
print(f"高级搜索请求失败: {response.status_code}")
print(response.text[:500]) # 只打印前500个字符避免过长
return None
# 获取用户最新推文 - 使用TwitterAPI.io的user/last_tweets端点
def get_user_last_tweets(username, count=5):
url = "https://api.twitterapi.io/twitter/user/last_tweets"
params = {
"userName": username,
"count": count
}
headers = {
"X-API-Key": TWITTER_API_IO_KEY
}
print(f"发送请求到 TwitterAPI.io 获取用户推文:")
print(f"URL: {url}")
print(f"参数: {params}")
print(f"Headers: {{'X-API-Key': '****' + TWITTER_API_IO_KEY[-4:]}}")
try:
response = requests.get(url, headers=headers, params=params)
print(f"响应状态码: {response.status_code}")
print(f"响应前100个字符: {response.text[:100]}")
if response.status_code == 200:
data = response.json()
if "status" in data and data["status"] == "success":
# 检查是否有推文数据
if "data" in data and "tweets" in data["data"] and len(data["data"]["tweets"]) > 0:
print(f"找到 {len(data['data']['tweets'])} 条推文")
# 把推文数据调整为与原代码兼容的格式
return {"tweets": data["data"]["tweets"]}
else:
print("API返回成功但没有推文数据")
print(f"响应键: {list(data.keys())}")
if "data" in data:
print(f"data键: {list(data['data'].keys())}")
return None
else:
print(f"API返回非成功状态: {data.get('status', 'unknown')}")
return None
else:
print(f"获取推文失败: {response.status_code}")
print(response.text[:500]) # 只打印前500个字符
return None
except Exception as e:
print(f"获取用户推文时出错: {str(e)}")
print("尝试使用高级搜索API作为备选方案...")
# 作为备选方案使用高级搜索API
return advanced_search_tweets(
query_type="Latest",
username=username,
max_results=count
)
# 获取推文回复
def get_tweet_replies(tweet_id, count=10):
url = "https://api.twitterapi.io/twitter/tweet/replies"
params = {
"tweetId": tweet_id,
"count": count
}
headers = {
"X-API-Key": TWITTER_API_IO_KEY
}
print(f"获取推文 {tweet_id} 的回复...")
print(f"URL: {url}")
print(f"参数: {params}")
response = requests.get(url, headers=headers, params=params)
print(f"响应状态码: {response.status_code}")
if response.status_code == 200:
reply_data = response.json()
print(f"回复数据结构: {list(reply_data.keys() if isinstance(reply_data, dict) else ['非字典'])}")
if isinstance(reply_data, dict) and "tweets" in reply_data:
print(f"找到 {len(reply_data['tweets'])} 条回复")
return reply_data
else:
print(f"获取回复失败: {response.status_code}")
print(response.text[:500]) # 只打印前500个字符
return None
# 使用DeepSeek生成回复内容
def generate_reply(tweet_content, reply_content):
prompt = f"""
我收到了以下推文和回复:
我的推文: "{tweet_content}"
用户的回复: "{reply_content}"
请生成一个友好且相关的回复不超过200个字符。回复应该表达感谢并继续对话。不要包含字符数或其他元信息。
"""
try:
response = deepseek_client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "你是一个友好的社交媒体助手,擅长简短而有趣的回复。"},
{"role": "user", "content": prompt},
],
stream=False
)
reply = response.choices[0].message.content.strip()
print(f"生成的回复: {reply}")
return reply[:200] # 确保不超过200字符
except Exception as e:
print(f"生成回复失败: {e}")
return "谢谢你的回复!"
# 获取自身user_id
def get_my_user_id():
user = tweepy.Client(
consumer_key=TWITTER_API_KEY,
consumer_secret=TWITTER_API_SECRET,
access_token=TWITTER_ACCESS_TOKEN,
access_token_secret=TWITTER_ACCESS_SECRET
).get_me()
return user.data.id
# 点赞推文
def like_tweet(tweet_id):
try:
tweepy.Client(
consumer_key=TWITTER_API_KEY,
consumer_secret=TWITTER_API_SECRET,
access_token=TWITTER_ACCESS_TOKEN,
access_token_secret=TWITTER_ACCESS_SECRET
).like(tweet_id)
print(f"成功点赞推文: {tweet_id}")
return True
except Exception as e:
print(f"点赞失败: {e}")
return False
# 回复推文
def reply_to_tweet(tweet_id, reply_text):
try:
tweepy.Client(
consumer_key=TWITTER_API_KEY,
consumer_secret=TWITTER_API_SECRET,
access_token=TWITTER_ACCESS_TOKEN,
access_token_secret=TWITTER_ACCESS_SECRET
).create_tweet(in_reply_to_tweet_id=tweet_id, text=reply_text)
print(f"成功回复: {tweet_id}")
return True
except Exception as e:
print(f"回复失败: {e}")
return False
# 加载已处理的回复
def load_processed_replies():
try:
if os.path.exists(PROCESSED_REPLIES_FILE):
with open(PROCESSED_REPLIES_FILE, "r") as f:
data = json.load(f)
return data
return {}
except Exception as e:
print(f"加载已处理回复时出错: {e}")
return {}
# 保存已处理的回复
def save_processed_replies(processed_replies_dict):
try:
with open(PROCESSED_REPLIES_FILE, "w") as f:
json.dump(processed_replies_dict, f)
print(f"成功保存已处理的回复到 {PROCESSED_REPLIES_FILE}")
except Exception as e:
print(f"保存已处理回复时出错: {e}")
# 处理回复函数
def process_replies(replies, tweet_content, your_username, tweet_id, processed_replies_dict, my_user_id):
if tweet_id not in processed_replies_dict:
processed_replies_dict[tweet_id] = []
processed_replies = set(processed_replies_dict[tweet_id])
print("检查回复数据结构...")
print(f"回复类型: {type(replies)}")
# 尝试提取回复列表
reply_items = []
if isinstance(replies, dict):
if "tweets" in replies and isinstance(replies["tweets"], list):
reply_items = replies["tweets"]
print(f"使用 tweets 字段,找到 {len(reply_items)} 条回复")
elif "data" in replies:
data = replies["data"]
if isinstance(data, list):
reply_items = data
print(f"使用 data 字段,找到 {len(reply_items)} 条回复")
if not reply_items:
print("未能找到回复项")
return
for reply in reply_items:
try:
# 提取回复信息
reply_id = reply.get("id") or reply.get("id_str")
if not reply_id:
print("无法获取回复ID跳过")
continue
reply_content = reply.get("text", "") or reply.get("full_text", "") or reply.get("content", "")
# 提取作者信息
reply_author = None
if "user" in reply and isinstance(reply["user"], dict):
reply_author = reply["user"].get("username") or reply["user"].get("screen_name")
elif "username" in reply:
reply_author = reply["username"]
if not reply_author:
print(f"无法获取回复作者,使用默认值")
reply_author = "unknown_user"
print(f"\n处理回复: ID={reply_id}, 作者={reply_author}")
print(f"回复内容: {reply_content}")
# 跳过自己的回复或已处理的回复
if reply_author.lower() == your_username.lower():
print("跳过自己的回复")
continue
if reply_id in processed_replies:
print("跳过已处理的回复")
continue
# 点赞回复
like_result = like_tweet(reply_id)
# 生成回复内容
generated_reply = generate_reply(tweet_content, reply_content)
# 回复推文
reply_result = reply_to_tweet(reply_id, generated_reply)
if like_result and reply_result:
processed_replies.add(reply_id)
processed_replies_dict[tweet_id] = list(processed_replies)
save_processed_replies(processed_replies_dict)
time.sleep(2) # 点赞/回复间隔
except Exception as e:
print(f"处理回复时出错: {e}")
continue
# 主函数
def main():
print("Twitter留言点赞+回复机器人启动 [Tweepy版]")
processed_replies_dict = load_processed_replies()
your_username = "GWAaiagent" # 修改为你的Twitter用户名
my_user_id = get_my_user_id()
if not my_user_id:
print("无法获取自身user_id程序退出")
return
tweets_data = get_user_last_tweets(your_username, count=5)
if not tweets_data or "tweets" not in tweets_data or not tweets_data["tweets"]:
print("无法获取最近推文")
return
# 只处理最多5条推文防止API超量返回
if 'tweets' in tweets_data and len(tweets_data['tweets']) > 5:
tweets_data['tweets'] = tweets_data['tweets'][:5]
for tweet in tweets_data["tweets"]:
tweet_id = tweet["id"]
tweet_content = tweet.get("text", "") or tweet.get("full_text", "")
if not tweet_content:
continue
print(f"\n处理推文: {tweet_id}")
replies = get_tweet_replies(tweet_id)
if replies:
process_replies(replies, tweet_content, your_username, tweet_id, processed_replies_dict, my_user_id)
else:
print(f"推文 {tweet_id} 没有留言")
time.sleep(3) # 推文处理间隔
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"程序异常: {e}")