370 lines
13 KiB
Python
370 lines
13 KiB
Python
import requests
|
||
import time
|
||
import json
|
||
import os
|
||
from openai import OpenAI
|
||
import tweepy
|
||
|
||
# API密钥配置
|
||
TWITTER_API_KEY = "3nt1jN4VvqUaaXGHv9AN5VsTV"
|
||
TWITTER_API_SECRET = "M2io73S7TzitFiBw825QIq8atyZRljbIDQuTpH39uFZanQ4XFh"
|
||
TWITTER_ACCESS_TOKEN = "1944636908-prxfjL6OIb56BQjuFTdChrUPh81OjmBbV7pfnWw"
|
||
TWITTER_ACCESS_SECRET = "D5AdCVRvIhGEmTmXA8hL5ciAUxIqNMZ3K3B3YejpqqNKj"
|
||
TWITTER_BEARER_TOKEN = "AAAAAAAAAAAAAAAAAAAAAGOd0gEAAAAALtv%2BzLsfGLLa5ydUt60ci6J5ce0%3DMBXViJ1NLY4XYdeuMq1xZQ98kHbeGK5lAJoV2j7Ssmcafk8Skn"
|
||
TWITTER_API_IO_KEY = "e3dad005b0e54bdc88c6178a89adec13"
|
||
DEEPSEEK_API_KEY = "sk-8a121704a9bc4ec6a5ab0ae16e0bc0ba"
|
||
|
||
# 创建DeepSeek客户端
|
||
deepseek_client = OpenAI(
|
||
api_key=DEEPSEEK_API_KEY,
|
||
base_url="https://api.deepseek.com"
|
||
)
|
||
|
||
# 已处理的回复文件
|
||
PROCESSED_REPLIES_FILE = "../../../processed_replies.json"
|
||
|
||
|
||
# OAuth 1.0a 认证
|
||
def get_oauth():
|
||
return tweepy.OAuth1UserHandler(
|
||
TWITTER_API_KEY,
|
||
TWITTER_API_SECRET,
|
||
TWITTER_ACCESS_TOKEN,
|
||
TWITTER_ACCESS_SECRET
|
||
)
|
||
|
||
|
||
# 使用高级搜索API获取推文
|
||
def advanced_search_tweets(query_type="Latest", keywords=None, username=None, max_results=10):
|
||
url = "https://api.twitterapi.io/twitter/tweet/advanced_search"
|
||
|
||
# 构建查询参数
|
||
params = {
|
||
"queryType": query_type,
|
||
"maxResults": max_results
|
||
}
|
||
|
||
# 添加可选参数
|
||
if keywords:
|
||
params["query"] = keywords
|
||
|
||
if username:
|
||
params["fromUsers"] = username
|
||
|
||
headers = {
|
||
"X-API-Key": TWITTER_API_IO_KEY
|
||
}
|
||
|
||
print(f"发送高级搜索请求到 TwitterAPI.io:")
|
||
print(f"URL: {url}")
|
||
print(f"参数: {params}")
|
||
print(f"Headers: {{'X-API-Key': '****' + TWITTER_API_IO_KEY[-4:]}}")
|
||
|
||
response = requests.get(url, headers=headers, params=params)
|
||
print(f"响应状态码: {response.status_code}")
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
print(f"响应数据结构: {list(data.keys() if isinstance(data, dict) else ['非字典'])}")
|
||
|
||
# 检查并提取推文
|
||
tweets = []
|
||
if isinstance(data, dict):
|
||
if "tweets" in data:
|
||
tweets = data["tweets"]
|
||
elif "data" in data:
|
||
tweets = data["data"]
|
||
|
||
print(f"找到 {len(tweets)} 条推文")
|
||
return data
|
||
else:
|
||
print(f"高级搜索请求失败: {response.status_code}")
|
||
print(response.text[:500]) # 只打印前500个字符,避免过长
|
||
return None
|
||
|
||
|
||
# 获取用户最新推文 - 使用TwitterAPI.io的user/last_tweets端点
|
||
def get_user_last_tweets(username, count=5):
|
||
url = "https://api.twitterapi.io/twitter/user/last_tweets"
|
||
params = {
|
||
"userName": username,
|
||
"count": count
|
||
}
|
||
headers = {
|
||
"X-API-Key": TWITTER_API_IO_KEY
|
||
}
|
||
print(f"发送请求到 TwitterAPI.io 获取用户推文:")
|
||
print(f"URL: {url}")
|
||
print(f"参数: {params}")
|
||
print(f"Headers: {{'X-API-Key': '****' + TWITTER_API_IO_KEY[-4:]}}")
|
||
|
||
try:
|
||
response = requests.get(url, headers=headers, params=params)
|
||
print(f"响应状态码: {response.status_code}")
|
||
print(f"响应前100个字符: {response.text[:100]}")
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if "status" in data and data["status"] == "success":
|
||
# 检查是否有推文数据
|
||
if "data" in data and "tweets" in data["data"] and len(data["data"]["tweets"]) > 0:
|
||
print(f"找到 {len(data['data']['tweets'])} 条推文")
|
||
# 把推文数据调整为与原代码兼容的格式
|
||
return {"tweets": data["data"]["tweets"]}
|
||
else:
|
||
print("API返回成功但没有推文数据")
|
||
print(f"响应键: {list(data.keys())}")
|
||
if "data" in data:
|
||
print(f"data键: {list(data['data'].keys())}")
|
||
return None
|
||
else:
|
||
print(f"API返回非成功状态: {data.get('status', 'unknown')}")
|
||
return None
|
||
else:
|
||
print(f"获取推文失败: {response.status_code}")
|
||
print(response.text[:500]) # 只打印前500个字符
|
||
return None
|
||
except Exception as e:
|
||
print(f"获取用户推文时出错: {str(e)}")
|
||
print("尝试使用高级搜索API作为备选方案...")
|
||
# 作为备选方案,使用高级搜索API
|
||
return advanced_search_tweets(
|
||
query_type="Latest",
|
||
username=username,
|
||
max_results=count
|
||
)
|
||
|
||
|
||
# 获取推文回复
|
||
def get_tweet_replies(tweet_id, count=10):
|
||
url = "https://api.twitterapi.io/twitter/tweet/replies"
|
||
params = {
|
||
"tweetId": tweet_id,
|
||
"count": count
|
||
}
|
||
headers = {
|
||
"X-API-Key": TWITTER_API_IO_KEY
|
||
}
|
||
print(f"获取推文 {tweet_id} 的回复...")
|
||
print(f"URL: {url}")
|
||
print(f"参数: {params}")
|
||
|
||
response = requests.get(url, headers=headers, params=params)
|
||
print(f"响应状态码: {response.status_code}")
|
||
|
||
if response.status_code == 200:
|
||
reply_data = response.json()
|
||
print(f"回复数据结构: {list(reply_data.keys() if isinstance(reply_data, dict) else ['非字典'])}")
|
||
if isinstance(reply_data, dict) and "tweets" in reply_data:
|
||
print(f"找到 {len(reply_data['tweets'])} 条回复")
|
||
return reply_data
|
||
else:
|
||
print(f"获取回复失败: {response.status_code}")
|
||
print(response.text[:500]) # 只打印前500个字符
|
||
return None
|
||
|
||
|
||
# 使用DeepSeek生成回复内容
|
||
def generate_reply(tweet_content, reply_content):
|
||
prompt = f"""
|
||
我收到了以下推文和回复:
|
||
|
||
我的推文: "{tweet_content}"
|
||
用户的回复: "{reply_content}"
|
||
|
||
请生成一个友好且相关的回复,不超过200个字符。回复应该表达感谢并继续对话。不要包含字符数或其他元信息。
|
||
"""
|
||
try:
|
||
response = deepseek_client.chat.completions.create(
|
||
model="deepseek-chat",
|
||
messages=[
|
||
{"role": "system", "content": "你是一个友好的社交媒体助手,擅长简短而有趣的回复。"},
|
||
{"role": "user", "content": prompt},
|
||
],
|
||
stream=False
|
||
)
|
||
reply = response.choices[0].message.content.strip()
|
||
print(f"生成的回复: {reply}")
|
||
return reply[:200] # 确保不超过200字符
|
||
except Exception as e:
|
||
print(f"生成回复失败: {e}")
|
||
return "谢谢你的回复!"
|
||
|
||
|
||
# 获取自身user_id
|
||
def get_my_user_id():
|
||
user = tweepy.Client(
|
||
consumer_key=TWITTER_API_KEY,
|
||
consumer_secret=TWITTER_API_SECRET,
|
||
access_token=TWITTER_ACCESS_TOKEN,
|
||
access_token_secret=TWITTER_ACCESS_SECRET
|
||
).get_me()
|
||
return user.data.id
|
||
|
||
|
||
# 点赞推文
|
||
def like_tweet(tweet_id):
|
||
try:
|
||
tweepy.Client(
|
||
consumer_key=TWITTER_API_KEY,
|
||
consumer_secret=TWITTER_API_SECRET,
|
||
access_token=TWITTER_ACCESS_TOKEN,
|
||
access_token_secret=TWITTER_ACCESS_SECRET
|
||
).like(tweet_id)
|
||
print(f"成功点赞推文: {tweet_id}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"点赞失败: {e}")
|
||
return False
|
||
|
||
|
||
# 回复推文
|
||
def reply_to_tweet(tweet_id, reply_text):
|
||
try:
|
||
tweepy.Client(
|
||
consumer_key=TWITTER_API_KEY,
|
||
consumer_secret=TWITTER_API_SECRET,
|
||
access_token=TWITTER_ACCESS_TOKEN,
|
||
access_token_secret=TWITTER_ACCESS_SECRET
|
||
).create_tweet(in_reply_to_tweet_id=tweet_id, text=reply_text)
|
||
print(f"成功回复: {tweet_id}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"回复失败: {e}")
|
||
return False
|
||
|
||
|
||
# 加载已处理的回复
|
||
def load_processed_replies():
|
||
try:
|
||
if os.path.exists(PROCESSED_REPLIES_FILE):
|
||
with open(PROCESSED_REPLIES_FILE, "r") as f:
|
||
data = json.load(f)
|
||
return data
|
||
return {}
|
||
except Exception as e:
|
||
print(f"加载已处理回复时出错: {e}")
|
||
return {}
|
||
|
||
|
||
# 保存已处理的回复
|
||
def save_processed_replies(processed_replies_dict):
|
||
try:
|
||
with open(PROCESSED_REPLIES_FILE, "w") as f:
|
||
json.dump(processed_replies_dict, f)
|
||
print(f"成功保存已处理的回复到 {PROCESSED_REPLIES_FILE}")
|
||
except Exception as e:
|
||
print(f"保存已处理回复时出错: {e}")
|
||
|
||
|
||
# 处理回复函数
|
||
def process_replies(replies, tweet_content, your_username, tweet_id, processed_replies_dict, my_user_id):
|
||
if tweet_id not in processed_replies_dict:
|
||
processed_replies_dict[tweet_id] = []
|
||
|
||
processed_replies = set(processed_replies_dict[tweet_id])
|
||
|
||
print("检查回复数据结构...")
|
||
print(f"回复类型: {type(replies)}")
|
||
|
||
# 尝试提取回复列表
|
||
reply_items = []
|
||
if isinstance(replies, dict):
|
||
if "tweets" in replies and isinstance(replies["tweets"], list):
|
||
reply_items = replies["tweets"]
|
||
print(f"使用 tweets 字段,找到 {len(reply_items)} 条回复")
|
||
elif "data" in replies:
|
||
data = replies["data"]
|
||
if isinstance(data, list):
|
||
reply_items = data
|
||
print(f"使用 data 字段,找到 {len(reply_items)} 条回复")
|
||
|
||
if not reply_items:
|
||
print("未能找到回复项")
|
||
return
|
||
|
||
for reply in reply_items:
|
||
try:
|
||
# 提取回复信息
|
||
reply_id = reply.get("id") or reply.get("id_str")
|
||
if not reply_id:
|
||
print("无法获取回复ID,跳过")
|
||
continue
|
||
|
||
reply_content = reply.get("text", "") or reply.get("full_text", "") or reply.get("content", "")
|
||
|
||
# 提取作者信息
|
||
reply_author = None
|
||
if "user" in reply and isinstance(reply["user"], dict):
|
||
reply_author = reply["user"].get("username") or reply["user"].get("screen_name")
|
||
elif "username" in reply:
|
||
reply_author = reply["username"]
|
||
|
||
if not reply_author:
|
||
print(f"无法获取回复作者,使用默认值")
|
||
reply_author = "unknown_user"
|
||
|
||
print(f"\n处理回复: ID={reply_id}, 作者={reply_author}")
|
||
print(f"回复内容: {reply_content}")
|
||
|
||
# 跳过自己的回复或已处理的回复
|
||
if reply_author.lower() == your_username.lower():
|
||
print("跳过自己的回复")
|
||
continue
|
||
|
||
if reply_id in processed_replies:
|
||
print("跳过已处理的回复")
|
||
continue
|
||
|
||
# 点赞回复
|
||
like_result = like_tweet(reply_id)
|
||
# 生成回复内容
|
||
generated_reply = generate_reply(tweet_content, reply_content)
|
||
|
||
# 回复推文
|
||
reply_result = reply_to_tweet(reply_id, generated_reply)
|
||
if like_result and reply_result:
|
||
processed_replies.add(reply_id)
|
||
processed_replies_dict[tweet_id] = list(processed_replies)
|
||
save_processed_replies(processed_replies_dict)
|
||
time.sleep(2) # 点赞/回复间隔
|
||
except Exception as e:
|
||
print(f"处理回复时出错: {e}")
|
||
continue
|
||
|
||
|
||
# 主函数
|
||
def main():
|
||
print("Twitter留言点赞+回复机器人启动 [Tweepy版]")
|
||
processed_replies_dict = load_processed_replies()
|
||
your_username = "GWAaiagent" # 修改为你的Twitter用户名
|
||
my_user_id = get_my_user_id()
|
||
if not my_user_id:
|
||
print("无法获取自身user_id,程序退出")
|
||
return
|
||
tweets_data = get_user_last_tweets(your_username, count=5)
|
||
if not tweets_data or "tweets" not in tweets_data or not tweets_data["tweets"]:
|
||
print("无法获取最近推文")
|
||
return
|
||
# 只处理最多5条推文,防止API超量返回
|
||
if 'tweets' in tweets_data and len(tweets_data['tweets']) > 5:
|
||
tweets_data['tweets'] = tweets_data['tweets'][:5]
|
||
for tweet in tweets_data["tweets"]:
|
||
tweet_id = tweet["id"]
|
||
tweet_content = tweet.get("text", "") or tweet.get("full_text", "")
|
||
if not tweet_content:
|
||
continue
|
||
print(f"\n处理推文: {tweet_id}")
|
||
replies = get_tweet_replies(tweet_id)
|
||
if replies:
|
||
process_replies(replies, tweet_content, your_username, tweet_id, processed_replies_dict, my_user_id)
|
||
else:
|
||
print(f"推文 {tweet_id} 没有留言")
|
||
time.sleep(3) # 推文处理间隔
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except Exception as e:
|
||
print(f"程序异常: {e}") |