import argparse import json import pymongo from queue import Queue from threading import Thread from urllib.request import urlopen # 读取配置文件中的数据库参数 with open("config.json", "r") as f: config = json.load(f) db_url = config["db_url"] db_name = config["db_name"] db_collection = config["db_collection"] # 连接数据库,创建 papers 集合 client = pymongo.MongoClient(db_url) db = client[db_name] papers = db[db_collection] def read_file(filename): data_list = [] with open(filename, 'r') as f: for line in f: line_dict = json.loads(line) data_list.append(line_dict) # 在这里可以对每个字典对象进行操作,例如: # print(data_dict['key']) return data_list def add_paper(file_path): papers.create_index("corpusid", unique=True) # 读取 paper 文件,存入数据库 data_list = read_file(file_path) # 批量插入数据 inserted_ids = 0 try: result = papers.insert_many(data_list, ordered=False) inserted_ids = len(result.inserted_ids) except pymongo.errors.BulkWriteError as e: inserted_ids = e.details['nInserted'] finally: # 输出插入结果 print("总插入数据: {0}, 已插入数据: {1}, 已存在数据: {2}" .format( len(data_list), inserted_ids, papers.count_documents({}))) def crawl_data(): # 创建任务队列和线程 q = Queue() num_threads = 4 threads = [] for i in range(num_threads): t = Thread(target=worker, args=(q,)) t.daemon = True t.start() threads.append(t) # 从数据库中读取 URL,加入任务队列 for data in papers.find(): url = data["url"] q.put(url) # 等待任务队列完成 q.join() # 停止线程 for i in range(num_threads): q.put(None) for t in threads: t.join() def mark_data_as_consumed(id): papers.update_one({'_id': id}, {'$set': {'consumed': True}}) def worker(q): while True: item = q.get() if item is None: break data = fetch_data(item) if data is not None: papers.insert_one(data) mark_data_as_consumed(item.id) q.task_done() def fetch_data(url): response = urlopen(url) data = json.load(response) return data if isinstance(data, dict) else None if __name__ == "__main__": parser = argparse.ArgumentParser(description="Crawl data from URLs") parser.add_argument( "command", choices=["add_paper", "crawl_data"], help="Command to execute" ) parser.add_argument("--path", help="Path to add to papers") args = parser.parse_args() if args.command == "add_paper": add_paper(args.path) elif args.command == "crawl_data": crawl_data()