123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- import argparse
- import json
- import pymongo
- from queue import Queue
- from threading import Thread
- from urllib.request import urlopen
- # 读取配置文件中的数据库参数
- with open("config.json", "r") as f:
- config = json.load(f)
- db_url = config["db_url"]
- db_name = config["db_name"]
- db_collection = config["db_collection"]
- # 连接数据库,创建 papers 集合
- client = pymongo.MongoClient(db_url)
- db = client[db_name]
- papers = db[db_collection]
- def read_file(filename):
- data_list = []
- with open(filename, 'r') as f:
- for line in f:
- line_dict = json.loads(line)
- data_list.append(line_dict)
- # 在这里可以对每个字典对象进行操作,例如:
- # print(data_dict['key'])
- return data_list
- def add_paper(file_path):
- papers.create_index("corpusid", unique=True)
- # 读取 paper 文件,存入数据库
- data_list = read_file(file_path)
- # 批量插入数据
- inserted_ids = 0
- try:
- result = papers.insert_many(data_list, ordered=False)
- inserted_ids = len(result.inserted_ids)
- except pymongo.errors.BulkWriteError as e:
- inserted_ids = e.details['nInserted']
- finally:
- # 输出插入结果
- print("总插入数据: {0}, 已插入数据: {1}, 已存在数据: {2}" .format(
- len(data_list), inserted_ids, papers.count_documents({})))
- def crawl_data():
- # 创建任务队列和线程
- q = Queue()
- num_threads = 4
- threads = []
- for i in range(num_threads):
- t = Thread(target=worker, args=(q,))
- t.daemon = True
- t.start()
- threads.append(t)
- # 从数据库中读取 URL,加入任务队列
- for data in papers.find():
- url = data["url"]
- q.put(url)
- # 等待任务队列完成
- q.join()
- # 停止线程
- for i in range(num_threads):
- q.put(None)
- for t in threads:
- t.join()
- def mark_data_as_consumed(id):
- papers.update_one({'_id': id}, {'$set': {'consumed': True}})
- def worker(q):
- while True:
- item = q.get()
- if item is None:
- break
- data = fetch_data(item)
- if data is not None:
- papers.insert_one(data)
- mark_data_as_consumed(item.id)
- q.task_done()
- def fetch_data(url):
- response = urlopen(url)
- data = json.load(response)
- return data if isinstance(data, dict) else None
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="Crawl data from URLs")
- parser.add_argument(
- "command", choices=["add_paper", "crawl_data"], help="Command to execute"
- )
- parser.add_argument("--path", help="Path to add to papers")
- args = parser.parse_args()
- if args.command == "add_paper":
- add_paper(args.path)
- elif args.command == "crawl_data":
- crawl_data()
|