spider.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import argparse
  2. import json
  3. import pymongo
  4. from queue import Queue
  5. from threading import Thread
  6. from urllib.request import urlopen
  7. # 读取配置文件中的数据库参数
  8. with open("config.json", "r") as f:
  9. config = json.load(f)
  10. db_url = config["db_url"]
  11. db_name = config["db_name"]
  12. db_collection = config["db_collection"]
  13. # 连接数据库,创建 papers 集合
  14. client = pymongo.MongoClient(db_url)
  15. db = client[db_name]
  16. papers = db[db_collection]
  17. def read_file(filename):
  18. data_list = []
  19. with open(filename, 'r') as f:
  20. for line in f:
  21. line_dict = json.loads(line)
  22. data_list.append(line_dict)
  23. # 在这里可以对每个字典对象进行操作,例如:
  24. # print(data_dict['key'])
  25. return data_list
  26. def add_paper(file_path):
  27. papers.create_index("corpusid", unique=True)
  28. # 读取 paper 文件,存入数据库
  29. data_list = read_file(file_path)
  30. # 批量插入数据
  31. inserted_ids = 0
  32. try:
  33. result = papers.insert_many(data_list, ordered=False)
  34. inserted_ids = len(result.inserted_ids)
  35. except pymongo.errors.BulkWriteError as e:
  36. inserted_ids = e.details['nInserted']
  37. finally:
  38. # 输出插入结果
  39. print("总插入数据: {0}, 已插入数据: {1}, 已存在数据: {2}" .format(
  40. len(data_list), inserted_ids, papers.count_documents({})))
  41. def crawl_data():
  42. # 创建任务队列和线程
  43. q = Queue()
  44. num_threads = 4
  45. threads = []
  46. for i in range(num_threads):
  47. t = Thread(target=worker, args=(q,))
  48. t.daemon = True
  49. t.start()
  50. threads.append(t)
  51. # 从数据库中读取 URL,加入任务队列
  52. for data in papers.find():
  53. url = data["url"]
  54. q.put(url)
  55. # 等待任务队列完成
  56. q.join()
  57. # 停止线程
  58. for i in range(num_threads):
  59. q.put(None)
  60. for t in threads:
  61. t.join()
  62. def mark_data_as_consumed(id):
  63. papers.update_one({'_id': id}, {'$set': {'consumed': True}})
  64. def worker(q):
  65. while True:
  66. item = q.get()
  67. if item is None:
  68. break
  69. data = fetch_data(item)
  70. if data is not None:
  71. papers.insert_one(data)
  72. mark_data_as_consumed(item.id)
  73. q.task_done()
  74. def fetch_data(url):
  75. response = urlopen(url)
  76. data = json.load(response)
  77. return data if isinstance(data, dict) else None
  78. if __name__ == "__main__":
  79. parser = argparse.ArgumentParser(description="Crawl data from URLs")
  80. parser.add_argument(
  81. "command", choices=["add_paper", "crawl_data"], help="Command to execute"
  82. )
  83. parser.add_argument("--path", help="Path to add to papers")
  84. args = parser.parse_args()
  85. if args.command == "add_paper":
  86. add_paper(args.path)
  87. elif args.command == "crawl_data":
  88. crawl_data()