import requests import time import json import os from openai import OpenAI import tweepy # API密钥配置 TWITTER_API_KEY = "3nt1jN4VvqUaaXGHv9AN5VsTV" TWITTER_API_SECRET = "M2io73S7TzitFiBw825QIq8atyZRljbIDQuTpH39uFZanQ4XFh" TWITTER_ACCESS_TOKEN = "1944636908-prxfjL6OIb56BQjuFTdChrUPh81OjmBbV7pfnWw" TWITTER_ACCESS_SECRET = "D5AdCVRvIhGEmTmXA8hL5ciAUxIqNMZ3K3B3YejpqqNKj" TWITTER_BEARER_TOKEN = "AAAAAAAAAAAAAAAAAAAAAGOd0gEAAAAALtv%2BzLsfGLLa5ydUt60ci6J5ce0%3DMBXViJ1NLY4XYdeuMq1xZQ98kHbeGK5lAJoV2j7Ssmcafk8Skn" TWITTER_API_IO_KEY = "e3dad005b0e54bdc88c6178a89adec13" DEEPSEEK_API_KEY = "sk-8a121704a9bc4ec6a5ab0ae16e0bc0ba" # 创建DeepSeek客户端 deepseek_client = OpenAI( api_key=DEEPSEEK_API_KEY, base_url="https://api.deepseek.com" ) # 已处理的回复文件 PROCESSED_REPLIES_FILE = "../../../processed_replies.json" # OAuth 1.0a 认证 def get_oauth(): return tweepy.OAuth1UserHandler( TWITTER_API_KEY, TWITTER_API_SECRET, TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_SECRET ) # 使用高级搜索API获取推文 def advanced_search_tweets(query_type="Latest", keywords=None, username=None, max_results=10): url = "https://api.twitterapi.io/twitter/tweet/advanced_search" # 构建查询参数 params = { "queryType": query_type, "maxResults": max_results } # 添加可选参数 if keywords: params["query"] = keywords if username: params["fromUsers"] = username headers = { "X-API-Key": TWITTER_API_IO_KEY } print(f"发送高级搜索请求到 TwitterAPI.io:") print(f"URL: {url}") print(f"参数: {params}") print(f"Headers: {{'X-API-Key': '****' + TWITTER_API_IO_KEY[-4:]}}") response = requests.get(url, headers=headers, params=params) print(f"响应状态码: {response.status_code}") if response.status_code == 200: data = response.json() print(f"响应数据结构: {list(data.keys() if isinstance(data, dict) else ['非字典'])}") # 检查并提取推文 tweets = [] if isinstance(data, dict): if "tweets" in data: tweets = data["tweets"] elif "data" in data: tweets = data["data"] print(f"找到 {len(tweets)} 条推文") return data else: print(f"高级搜索请求失败: {response.status_code}") print(response.text[:500]) # 只打印前500个字符,避免过长 return None # 获取用户最新推文 - 使用TwitterAPI.io的user/last_tweets端点 def get_user_last_tweets(username, count=5): url = "https://api.twitterapi.io/twitter/user/last_tweets" params = { "userName": username, "count": count } headers = { "X-API-Key": TWITTER_API_IO_KEY } print(f"发送请求到 TwitterAPI.io 获取用户推文:") print(f"URL: {url}") print(f"参数: {params}") print(f"Headers: {{'X-API-Key': '****' + TWITTER_API_IO_KEY[-4:]}}") try: response = requests.get(url, headers=headers, params=params) print(f"响应状态码: {response.status_code}") print(f"响应前100个字符: {response.text[:100]}") if response.status_code == 200: data = response.json() if "status" in data and data["status"] == "success": # 检查是否有推文数据 if "data" in data and "tweets" in data["data"] and len(data["data"]["tweets"]) > 0: print(f"找到 {len(data['data']['tweets'])} 条推文") # 把推文数据调整为与原代码兼容的格式 return {"tweets": data["data"]["tweets"]} else: print("API返回成功但没有推文数据") print(f"响应键: {list(data.keys())}") if "data" in data: print(f"data键: {list(data['data'].keys())}") return None else: print(f"API返回非成功状态: {data.get('status', 'unknown')}") return None else: print(f"获取推文失败: {response.status_code}") print(response.text[:500]) # 只打印前500个字符 return None except Exception as e: print(f"获取用户推文时出错: {str(e)}") print("尝试使用高级搜索API作为备选方案...") # 作为备选方案,使用高级搜索API return advanced_search_tweets( query_type="Latest", username=username, max_results=count ) # 获取推文回复 def get_tweet_replies(tweet_id, count=10): url = "https://api.twitterapi.io/twitter/tweet/replies" params = { "tweetId": tweet_id, "count": count } headers = { "X-API-Key": TWITTER_API_IO_KEY } print(f"获取推文 {tweet_id} 的回复...") print(f"URL: {url}") print(f"参数: {params}") response = requests.get(url, headers=headers, params=params) print(f"响应状态码: {response.status_code}") if response.status_code == 200: reply_data = response.json() print(f"回复数据结构: {list(reply_data.keys() if isinstance(reply_data, dict) else ['非字典'])}") if isinstance(reply_data, dict) and "tweets" in reply_data: print(f"找到 {len(reply_data['tweets'])} 条回复") return reply_data else: print(f"获取回复失败: {response.status_code}") print(response.text[:500]) # 只打印前500个字符 return None # 使用DeepSeek生成回复内容 def generate_reply(tweet_content, reply_content): prompt = f""" 我收到了以下推文和回复: 我的推文: "{tweet_content}" 用户的回复: "{reply_content}" 请生成一个友好且相关的回复,不超过200个字符。回复应该表达感谢并继续对话。不要包含字符数或其他元信息。 """ try: response = deepseek_client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": "你是一个友好的社交媒体助手,擅长简短而有趣的回复。"}, {"role": "user", "content": prompt}, ], stream=False ) reply = response.choices[0].message.content.strip() print(f"生成的回复: {reply}") return reply[:200] # 确保不超过200字符 except Exception as e: print(f"生成回复失败: {e}") return "谢谢你的回复!" # 获取自身user_id def get_my_user_id(): user = tweepy.Client( consumer_key=TWITTER_API_KEY, consumer_secret=TWITTER_API_SECRET, access_token=TWITTER_ACCESS_TOKEN, access_token_secret=TWITTER_ACCESS_SECRET ).get_me() return user.data.id # 点赞推文 def like_tweet(tweet_id): try: tweepy.Client( consumer_key=TWITTER_API_KEY, consumer_secret=TWITTER_API_SECRET, access_token=TWITTER_ACCESS_TOKEN, access_token_secret=TWITTER_ACCESS_SECRET ).like(tweet_id) print(f"成功点赞推文: {tweet_id}") return True except Exception as e: print(f"点赞失败: {e}") return False # 回复推文 def reply_to_tweet(tweet_id, reply_text): try: tweepy.Client( consumer_key=TWITTER_API_KEY, consumer_secret=TWITTER_API_SECRET, access_token=TWITTER_ACCESS_TOKEN, access_token_secret=TWITTER_ACCESS_SECRET ).create_tweet(in_reply_to_tweet_id=tweet_id, text=reply_text) print(f"成功回复: {tweet_id}") return True except Exception as e: print(f"回复失败: {e}") return False # 加载已处理的回复 def load_processed_replies(): try: if os.path.exists(PROCESSED_REPLIES_FILE): with open(PROCESSED_REPLIES_FILE, "r") as f: data = json.load(f) return data return {} except Exception as e: print(f"加载已处理回复时出错: {e}") return {} # 保存已处理的回复 def save_processed_replies(processed_replies_dict): try: with open(PROCESSED_REPLIES_FILE, "w") as f: json.dump(processed_replies_dict, f) print(f"成功保存已处理的回复到 {PROCESSED_REPLIES_FILE}") except Exception as e: print(f"保存已处理回复时出错: {e}") # 处理回复函数 def process_replies(replies, tweet_content, your_username, tweet_id, processed_replies_dict, my_user_id): if tweet_id not in processed_replies_dict: processed_replies_dict[tweet_id] = [] processed_replies = set(processed_replies_dict[tweet_id]) print("检查回复数据结构...") print(f"回复类型: {type(replies)}") # 尝试提取回复列表 reply_items = [] if isinstance(replies, dict): if "tweets" in replies and isinstance(replies["tweets"], list): reply_items = replies["tweets"] print(f"使用 tweets 字段,找到 {len(reply_items)} 条回复") elif "data" in replies: data = replies["data"] if isinstance(data, list): reply_items = data print(f"使用 data 字段,找到 {len(reply_items)} 条回复") if not reply_items: print("未能找到回复项") return for reply in reply_items: try: # 提取回复信息 reply_id = reply.get("id") or reply.get("id_str") if not reply_id: print("无法获取回复ID,跳过") continue reply_content = reply.get("text", "") or reply.get("full_text", "") or reply.get("content", "") # 提取作者信息 reply_author = None if "user" in reply and isinstance(reply["user"], dict): reply_author = reply["user"].get("username") or reply["user"].get("screen_name") elif "username" in reply: reply_author = reply["username"] if not reply_author: print(f"无法获取回复作者,使用默认值") reply_author = "unknown_user" print(f"\n处理回复: ID={reply_id}, 作者={reply_author}") print(f"回复内容: {reply_content}") # 跳过自己的回复或已处理的回复 if reply_author.lower() == your_username.lower(): print("跳过自己的回复") continue if reply_id in processed_replies: print("跳过已处理的回复") continue # 点赞回复 like_result = like_tweet(reply_id) # 生成回复内容 generated_reply = generate_reply(tweet_content, reply_content) # 回复推文 reply_result = reply_to_tweet(reply_id, generated_reply) if like_result and reply_result: processed_replies.add(reply_id) processed_replies_dict[tweet_id] = list(processed_replies) save_processed_replies(processed_replies_dict) time.sleep(2) # 点赞/回复间隔 except Exception as e: print(f"处理回复时出错: {e}") continue # 主函数 def main(): print("Twitter留言点赞+回复机器人启动 [Tweepy版]") processed_replies_dict = load_processed_replies() your_username = "GWAaiagent" # 修改为你的Twitter用户名 my_user_id = get_my_user_id() if not my_user_id: print("无法获取自身user_id,程序退出") return tweets_data = get_user_last_tweets(your_username, count=5) if not tweets_data or "tweets" not in tweets_data or not tweets_data["tweets"]: print("无法获取最近推文") return # 只处理最多5条推文,防止API超量返回 if 'tweets' in tweets_data and len(tweets_data['tweets']) > 5: tweets_data['tweets'] = tweets_data['tweets'][:5] for tweet in tweets_data["tweets"]: tweet_id = tweet["id"] tweet_content = tweet.get("text", "") or tweet.get("full_text", "") if not tweet_content: continue print(f"\n处理推文: {tweet_id}") replies = get_tweet_replies(tweet_id) if replies: process_replies(replies, tweet_content, your_username, tweet_id, processed_replies_dict, my_user_id) else: print(f"推文 {tweet_id} 没有留言") time.sleep(3) # 推文处理间隔 if __name__ == "__main__": try: main() except Exception as e: print(f"程序异常: {e}")