spider.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import signal
  2. import argparse
  3. import json
  4. import pymongo
  5. from queue import Queue
  6. from threading import Thread
  7. from urllib.request import urlopen
  8. # 读取配置文件中的数据库参数
  9. with open("config.json", "r") as f:
  10. config = json.load(f)
  11. db_url = config["db_url"]
  12. db_name = config["db_name"]
  13. db_collection = config["db_collection"]
  14. # 连接数据库,创建 papers 集合
  15. client = pymongo.MongoClient(db_url)
  16. db = client[db_name]
  17. papers = db[db_collection]
  18. papers_data = db['{}_data'.format(db_collection)]
  19. def read_file(filename):
  20. data_list = []
  21. with open(filename, 'r') as f:
  22. for line in f:
  23. line_dict = json.loads(line)
  24. data_list.append(line_dict)
  25. # 在这里可以对每个字典对象进行操作,例如:
  26. # print(data_dict['key'])
  27. return data_list
  28. def add_paper(file_path):
  29. papers.create_index("corpusid", unique=True)
  30. # 读取 paper 文件,存入数据库
  31. data_list = read_file(file_path)
  32. # 批量插入数据
  33. inserted_ids = 0
  34. try:
  35. result = papers.insert_many(data_list, ordered=False)
  36. inserted_ids = len(result.inserted_ids)
  37. except pymongo.errors.BulkWriteError as e:
  38. inserted_ids = e.details['nInserted']
  39. finally:
  40. # 输出插入结果
  41. print("总插入数据: {0}, 已插入数据: {1}, 已存在数据: {2}" .format(
  42. len(data_list), inserted_ids, papers.count_documents({})))
  43. def crawl_data():
  44. papers_data.create_index("corpusid", unique=True)
  45. # 创建任务队列和线程
  46. q = Queue()
  47. num_threads = 4
  48. threads = []
  49. for i in range(num_threads):
  50. t = Thread(target=worker, args=(q,))
  51. t.daemon = True
  52. t.start()
  53. print("starting worker: {}".format(t.native_id))
  54. threads.append(t)
  55. # 从数据库中读取 URL,加入任务队列
  56. for data in papers.find():
  57. if 'consumed' in data.keys() and data['consumed'] is True:
  58. continue
  59. # print(data['corpusid'])
  60. # print(data['url'])
  61. url = data["url"]
  62. corpusid = data["corpusid"]
  63. q.put((url, corpusid))
  64. break
  65. #
  66. print("Waitting for the task queue to complete...")
  67. q.join()
  68. print("The task queue has been completed!")
  69. # 停止线程
  70. for i in range(num_threads):
  71. q.put(None)
  72. for t in threads:
  73. print("stoping worker: {}" . format(t.native_id))
  74. t.join()
  75. def mark_data_as_consumed(id):
  76. papers.update_one({'_id': id}, {'$set': {'consumed': True}})
  77. def worker(q):
  78. while True:
  79. item = q.get()
  80. if item is None:
  81. break
  82. print('crawling data: {}'.format(item[0]))
  83. try:
  84. data = fetch_data(item[0])
  85. data = {
  86. 'url': item[0],
  87. 'corpusid': item[1]
  88. }
  89. if data is not None:
  90. papers_data.insert_one(data)
  91. mark_data_as_consumed(item[1])
  92. except Exception as error:
  93. # handle the exception
  94. print("An exception occurred:", error)
  95. finally:
  96. q.task_done()
  97. def fetch_data(url):
  98. response = urlopen(url)
  99. # time.sleep(5)
  100. data = None
  101. # data = json.load(response)
  102. return data if isinstance(data, dict) else None
  103. def onSigInt(signo, frame):
  104. pass
  105. if __name__ == "__main__":
  106. # 主进程退出信号
  107. signal.signal(signal.SIGINT, onSigInt)
  108. parser = argparse.ArgumentParser(description="Crawl data from URLs")
  109. parser.add_argument(
  110. "command", choices=["add_paper", "crawl_data"], help="Command to execute"
  111. )
  112. parser.add_argument("--path", help="Path to add to papers")
  113. args = parser.parse_args()
  114. if args.command == "add_paper":
  115. add_paper(args.path)
  116. elif args.command == "crawl_data":
  117. crawl_data()