spider.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import argparse
  2. import json
  3. import pymongo
  4. from queue import Queue
  5. from threading import Thread
  6. from urllib.request import urlopen
  7. # 读取配置文件中的数据库参数
  8. with open("config.json", "r") as f:
  9. config = json.load(f)
  10. db_url = config["db_url"]
  11. db_name = config["db_name"]
  12. db_collection = config["db_collection"]
  13. # 连接数据库,创建 papers 集合
  14. client = pymongo.MongoClient(db_url)
  15. db = client[db_name]
  16. papers = db[db_collection]
  17. def read_file(filename):
  18. data_list = []
  19. with open(filename, 'r') as f:
  20. for line in f:
  21. line_dict = json.loads(line)
  22. data_list.append(line_dict)
  23. # 在这里可以对每个字典对象进行操作,例如:
  24. # print(data_dict['key'])
  25. return data_list
  26. def add_paper(file_path):
  27. # 读取 paper 文件,存入数据库
  28. data_list = read_file(file_path)
  29. print(len(data_list))
  30. # 批量插入数据
  31. result = papers.insert_many(data_list)
  32. # 输出插入结果
  33. print(result.inserted_ids)
  34. def crawl_data():
  35. # 创建任务队列和线程
  36. q = Queue()
  37. num_threads = 4
  38. threads = []
  39. for i in range(num_threads):
  40. t = Thread(target=worker, args=(q,))
  41. t.daemon = True
  42. t.start()
  43. threads.append(t)
  44. # 从数据库中读取 URL,加入任务队列
  45. for data in papers.find():
  46. url = data["url"]
  47. q.put(url)
  48. # 等待任务队列完成
  49. q.join()
  50. # 停止线程
  51. for i in range(num_threads):
  52. q.put(None)
  53. for t in threads:
  54. t.join()
  55. def mark_data_as_consumed(id):
  56. papers.update_one({'_id': id}, {'$set': {'consumed': True}})
  57. def worker(q):
  58. while True:
  59. item = q.get()
  60. if item is None:
  61. break
  62. data = fetch_data(item)
  63. if data is not None:
  64. papers.insert_one(data)
  65. mark_data_as_consumed(item.id)
  66. q.task_done()
  67. def fetch_data(url):
  68. response = urlopen(url)
  69. data = json.load(response)
  70. return data if isinstance(data, dict) else None
  71. if __name__ == "__main__":
  72. parser = argparse.ArgumentParser(description="Crawl data from URLs")
  73. parser.add_argument(
  74. "command", choices=["add_paper", "crawl_data"], help="Command to execute"
  75. )
  76. parser.add_argument("--path", help="Path to add to papers")
  77. args = parser.parse_args()
  78. if args.command == "add_paper":
  79. add_paper(args.path)
  80. elif args.command == "crawl_data":
  81. crawl_data()