import signal import argparse import json import pymongo from queue import Queue from threading import Thread from urllib.request import urlopen # 读取配置文件中的数据库参数 with open("config.json", "r") as f: config = json.load(f) db_url = config["db_url"] db_name = config["db_name"] db_collection = config["db_collection"] # 连接数据库,创建 papers 集合 client = pymongo.MongoClient(db_url) db = client[db_name] papers = db[db_collection] papers_data = db['{}_data'.format(db_collection)] def read_file(filename): data_list = [] with open(filename, 'r') as f: for line in f: line_dict = json.loads(line) data_list.append(line_dict) # 在这里可以对每个字典对象进行操作,例如: # print(data_dict['key']) return data_list def add_paper(file_path): papers.create_index("corpusid", unique=True) # 读取 paper 文件,存入数据库 data_list = read_file(file_path) # 批量插入数据 inserted_ids = 0 try: result = papers.insert_many(data_list, ordered=False) inserted_ids = len(result.inserted_ids) except pymongo.errors.BulkWriteError as e: inserted_ids = e.details['nInserted'] finally: # 输出插入结果 print("总插入数据: {0}, 已插入数据: {1}, 已存在数据: {2}" .format( len(data_list), inserted_ids, papers.count_documents({}))) def crawl_data(): papers_data.create_index("corpusid", unique=True) # 创建任务队列和线程 q = Queue() num_threads = 4 threads = [] for i in range(num_threads): t = Thread(target=worker, args=(q,)) t.daemon = True t.start() print("starting worker: {}".format(t.native_id)) threads.append(t) # 从数据库中读取 URL,加入任务队列 for data in papers.find(): if 'consumed' in data.keys() and data['consumed'] is True: continue # print(data['corpusid']) # print(data['url']) url = data["url"] corpusid = data["corpusid"] q.put((url, corpusid)) break # print("Waitting for the task queue to complete...") q.join() print("The task queue has been completed!") # 停止线程 for i in range(num_threads): q.put(None) for t in threads: print("stoping worker: {}" . format(t.native_id)) t.join() def mark_data_as_consumed(id): papers.update_one({'_id': id}, {'$set': {'consumed': True}}) def worker(q): while True: item = q.get() if item is None: break print('crawling data: {}'.format(item[0])) try: data = fetch_data(item[0]) data = { 'url': item[0], 'corpusid': item[1] } if data is not None: papers_data.insert_one(data) mark_data_as_consumed(item[1]) except Exception as error: # handle the exception print("An exception occurred:", error) finally: q.task_done() def fetch_data(url): response = urlopen(url) # time.sleep(5) data = None # data = json.load(response) return data if isinstance(data, dict) else None def onSigInt(signo, frame): pass if __name__ == "__main__": # 主进程退出信号 signal.signal(signal.SIGINT, onSigInt) parser = argparse.ArgumentParser(description="Crawl data from URLs") parser.add_argument( "command", choices=["add_paper", "crawl_data"], help="Command to execute" ) parser.add_argument("--path", help="Path to add to papers") args = parser.parse_args() if args.command == "add_paper": add_paper(args.path) elif args.command == "crawl_data": crawl_data()