123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- import signal
- import argparse
- import json
- import pymongo
- from queue import Queue
- from threading import Thread
- from urllib.request import urlopen
- # 读取配置文件中的数据库参数
- with open("config.json", "r") as f:
- config = json.load(f)
- db_url = config["db_url"]
- db_name = config["db_name"]
- db_collection = config["db_collection"]
- # 连接数据库,创建 papers 集合
- client = pymongo.MongoClient(db_url)
- db = client[db_name]
- papers = db[db_collection]
- papers_data = db['{}_data'.format(db_collection)]
- def read_file(filename):
- data_list = []
- with open(filename, 'r') as f:
- for line in f:
- line_dict = json.loads(line)
- data_list.append(line_dict)
- # 在这里可以对每个字典对象进行操作,例如:
- # print(data_dict['key'])
- return data_list
- def add_paper(file_path):
- papers.create_index("corpusid", unique=True)
- # 读取 paper 文件,存入数据库
- data_list = read_file(file_path)
- # 批量插入数据
- inserted_ids = 0
- try:
- result = papers.insert_many(data_list, ordered=False)
- inserted_ids = len(result.inserted_ids)
- except pymongo.errors.BulkWriteError as e:
- inserted_ids = e.details['nInserted']
- finally:
- # 输出插入结果
- print("总插入数据: {0}, 已插入数据: {1}, 已存在数据: {2}" .format(
- len(data_list), inserted_ids, papers.count_documents({})))
- def crawl_data():
- papers_data.create_index("corpusid", unique=True)
- # 创建任务队列和线程
- q = Queue()
- num_threads = 4
- threads = []
- for i in range(num_threads):
- t = Thread(target=worker, args=(q,))
- t.daemon = True
- t.start()
- print("starting worker: {}".format(t.native_id))
- threads.append(t)
- # 从数据库中读取 URL,加入任务队列
- for data in papers.find():
- if 'consumed' in data.keys() and data['consumed'] is True:
- continue
- # print(data['corpusid'])
- # print(data['url'])
- url = data["url"]
- corpusid = data["corpusid"]
- q.put((url, corpusid))
- break
- #
- print("Waitting for the task queue to complete...")
- q.join()
- print("The task queue has been completed!")
- # 停止线程
- for i in range(num_threads):
- q.put(None)
- for t in threads:
- print("stoping worker: {}" . format(t.native_id))
- t.join()
- def mark_data_as_consumed(id):
- papers.update_one({'_id': id}, {'$set': {'consumed': True}})
- def worker(q):
- while True:
- item = q.get()
- if item is None:
- break
- print('crawling data: {}'.format(item[0]))
- try:
- data = fetch_data(item[0])
- data = {
- 'url': item[0],
- 'corpusid': item[1]
- }
- if data is not None:
- papers_data.insert_one(data)
- mark_data_as_consumed(item[1])
- except Exception as error:
- # handle the exception
- print("An exception occurred:", error)
- finally:
- q.task_done()
- def fetch_data(url):
- response = urlopen(url)
- # time.sleep(5)
- data = None
- # data = json.load(response)
- return data if isinstance(data, dict) else None
- def onSigInt(signo, frame):
- pass
- if __name__ == "__main__":
- # 主进程退出信号
- signal.signal(signal.SIGINT, onSigInt)
- parser = argparse.ArgumentParser(description="Crawl data from URLs")
- parser.add_argument(
- "command", choices=["add_paper", "crawl_data"], help="Command to execute"
- )
- parser.add_argument("--path", help="Path to add to papers")
- args = parser.parse_args()
- if args.command == "add_paper":
- add_paper(args.path)
- elif args.command == "crawl_data":
- crawl_data()
|