|
@@ -0,0 +1,101 @@
|
|
|
|
+import argparse
|
|
|
|
+import json
|
|
|
|
+import pymongo
|
|
|
|
+from queue import Queue
|
|
|
|
+from threading import Thread
|
|
|
|
+from urllib.request import urlopen
|
|
|
|
+
|
|
|
|
+# 读取配置文件中的数据库参数
|
|
|
|
+with open("config.json", "r") as f:
|
|
|
|
+ config = json.load(f)
|
|
|
|
+db_url = config["db_url"]
|
|
|
|
+db_name = config["db_name"]
|
|
|
|
+db_collection = config["db_collection"]
|
|
|
|
+
|
|
|
|
+# 连接数据库,创建 papers 集合
|
|
|
|
+client = pymongo.MongoClient(db_url)
|
|
|
|
+db = client[db_name]
|
|
|
|
+papers = db[db_collection]
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def read_file(filename):
|
|
|
|
+ data_list = []
|
|
|
|
+ with open(filename, 'r') as f:
|
|
|
|
+ for line in f:
|
|
|
|
+ line_dict = json.loads(line)
|
|
|
|
+ data_list.append(line_dict)
|
|
|
|
+ # 在这里可以对每个字典对象进行操作,例如:
|
|
|
|
+ # print(data_dict['key'])
|
|
|
|
+ return data_list
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def add_paper(file_path):
|
|
|
|
+ # 读取 paper 文件,存入数据库
|
|
|
|
+ data_list = read_file(file_path)
|
|
|
|
+ print(len(data_list))
|
|
|
|
+ # 批量插入数据
|
|
|
|
+ result = papers.insert_many(data_list)
|
|
|
|
+ # 输出插入结果
|
|
|
|
+ print(result.inserted_ids)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def crawl_data():
|
|
|
|
+ # 创建任务队列和线程
|
|
|
|
+ q = Queue()
|
|
|
|
+ num_threads = 4
|
|
|
|
+ threads = []
|
|
|
|
+ for i in range(num_threads):
|
|
|
|
+ t = Thread(target=worker, args=(q,))
|
|
|
|
+ t.daemon = True
|
|
|
|
+ t.start()
|
|
|
|
+ threads.append(t)
|
|
|
|
+
|
|
|
|
+ # 从数据库中读取 URL,加入任务队列
|
|
|
|
+ for data in papers.find():
|
|
|
|
+ url = data["url"]
|
|
|
|
+ q.put(url)
|
|
|
|
+
|
|
|
|
+ # 等待任务队列完成
|
|
|
|
+ q.join()
|
|
|
|
+
|
|
|
|
+ # 停止线程
|
|
|
|
+ for i in range(num_threads):
|
|
|
|
+ q.put(None)
|
|
|
|
+ for t in threads:
|
|
|
|
+ t.join()
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def mark_data_as_consumed(id):
|
|
|
|
+ papers.update_one({'_id': id}, {'$set': {'consumed': True}})
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def worker(q):
|
|
|
|
+ while True:
|
|
|
|
+ item = q.get()
|
|
|
|
+ if item is None:
|
|
|
|
+ break
|
|
|
|
+ data = fetch_data(item)
|
|
|
|
+ if data is not None:
|
|
|
|
+ papers.insert_one(data)
|
|
|
|
+ mark_data_as_consumed(item.id)
|
|
|
|
+ q.task_done()
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def fetch_data(url):
|
|
|
|
+ response = urlopen(url)
|
|
|
|
+ data = json.load(response)
|
|
|
|
+ return data if isinstance(data, dict) else None
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
+ parser = argparse.ArgumentParser(description="Crawl data from URLs")
|
|
|
|
+ parser.add_argument(
|
|
|
|
+ "command", choices=["add_paper", "crawl_data"], help="Command to execute"
|
|
|
|
+ )
|
|
|
|
+ parser.add_argument("--path", help="Path to add to papers")
|
|
|
|
+ args = parser.parse_args()
|
|
|
|
+
|
|
|
|
+ if args.command == "add_paper":
|
|
|
|
+ add_paper(args.path)
|
|
|
|
+ elif args.command == "crawl_data":
|
|
|
|
+ crawl_data()
|