|
@@ -1,3 +1,4 @@
|
|
|
+import signal
|
|
|
import argparse
|
|
|
import json
|
|
|
import pymongo
|
|
@@ -16,6 +17,7 @@ db_collection = config["db_collection"]
|
|
|
client = pymongo.MongoClient(db_url)
|
|
|
db = client[db_name]
|
|
|
papers = db[db_collection]
|
|
|
+papers_data = db['{}_data'.format(db_collection)]
|
|
|
|
|
|
|
|
|
def read_file(filename):
|
|
@@ -48,6 +50,8 @@ def add_paper(file_path):
|
|
|
|
|
|
|
|
|
def crawl_data():
|
|
|
+ papers_data.create_index("corpusid", unique=True)
|
|
|
+
|
|
|
# 创建任务队列和线程
|
|
|
q = Queue()
|
|
|
num_threads = 4
|
|
@@ -56,20 +60,30 @@ def crawl_data():
|
|
|
t = Thread(target=worker, args=(q,))
|
|
|
t.daemon = True
|
|
|
t.start()
|
|
|
+ print("starting worker: {}".format(t.native_id))
|
|
|
threads.append(t)
|
|
|
|
|
|
# 从数据库中读取 URL,加入任务队列
|
|
|
for data in papers.find():
|
|
|
+ if 'consumed' in data.keys() and data['consumed'] is True:
|
|
|
+ continue
|
|
|
+ # print(data['corpusid'])
|
|
|
+ # print(data['url'])
|
|
|
url = data["url"]
|
|
|
- q.put(url)
|
|
|
+ corpusid = data["corpusid"]
|
|
|
+ q.put((url, corpusid))
|
|
|
+ break
|
|
|
|
|
|
- # 等待任务队列完成
|
|
|
+ #
|
|
|
+ print("Waitting for the task queue to complete...")
|
|
|
q.join()
|
|
|
+ print("The task queue has been completed!")
|
|
|
|
|
|
# 停止线程
|
|
|
for i in range(num_threads):
|
|
|
q.put(None)
|
|
|
for t in threads:
|
|
|
+ print("stoping worker: {}" . format(t.native_id))
|
|
|
t.join()
|
|
|
|
|
|
|
|
@@ -82,20 +96,39 @@ def worker(q):
|
|
|
item = q.get()
|
|
|
if item is None:
|
|
|
break
|
|
|
- data = fetch_data(item)
|
|
|
- if data is not None:
|
|
|
- papers.insert_one(data)
|
|
|
- mark_data_as_consumed(item.id)
|
|
|
- q.task_done()
|
|
|
+ print('crawling data: {}'.format(item[0]))
|
|
|
+ try:
|
|
|
+ data = fetch_data(item[0])
|
|
|
+ data = {
|
|
|
+ 'url': item[0],
|
|
|
+ 'corpusid': item[1]
|
|
|
+ }
|
|
|
+ if data is not None:
|
|
|
+ papers_data.insert_one(data)
|
|
|
+ mark_data_as_consumed(item[1])
|
|
|
+ except Exception as error:
|
|
|
+ # handle the exception
|
|
|
+ print("An exception occurred:", error)
|
|
|
+ finally:
|
|
|
+ q.task_done()
|
|
|
|
|
|
|
|
|
def fetch_data(url):
|
|
|
response = urlopen(url)
|
|
|
- data = json.load(response)
|
|
|
+ # time.sleep(5)
|
|
|
+ data = None
|
|
|
+ # data = json.load(response)
|
|
|
return data if isinstance(data, dict) else None
|
|
|
|
|
|
|
|
|
+def onSigInt(signo, frame):
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
+ # 主进程退出信号
|
|
|
+ signal.signal(signal.SIGINT, onSigInt)
|
|
|
+
|
|
|
parser = argparse.ArgumentParser(description="Crawl data from URLs")
|
|
|
parser.add_argument(
|
|
|
"command", choices=["add_paper", "crawl_data"], help="Command to execute"
|