spider.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. import requests
  2. import os
  3. import signal
  4. import argparse
  5. import json
  6. import pymongo
  7. from queue import Queue
  8. from threading import Thread
  9. from urllib.parse import urlparse
  10. # S2_API_KEY = os.getenv('S2_API_KEY')
  11. QUERY_FIELDS1 = 'paperId,corpusId,title,authors,year,url,tldr,venue,externalIds,fieldsOfStudy,s2FieldsOfStudy,abstract,citationCount,referenceCount,publicationTypes,influentialCitationCount,publicationDate,journal'
  12. QUERY_FIELDS2 = 'paperId,corpusId,title,authors,year,url,venue,externalIds,fieldsOfStudy,s2FieldsOfStudy,abstract,citationCount,referenceCount,publicationTypes,influentialCitationCount,publicationDate,journal'
  13. QUERY_FIELDS3 = 'paperId,corpusId,title,authors'
  14. # 读取配置文件中的数据库参数
  15. with open("config.json", "r") as f:
  16. config = json.load(f)
  17. db_url = config["db_url"]
  18. db_name = config["db_name"]
  19. db_collection = config["db_collection"]
  20. NUM_THREADS = config["num_threads"]
  21. TASK_QUEUE_LEN = config["task_queue_len"]
  22. S2_API_KEY = config["s2_api_key"]
  23. # 定义退出标志
  24. quit_flag = False
  25. # 连接数据库,创建 papers 集合
  26. client = pymongo.MongoClient(db_url)
  27. db = client[db_name]
  28. papers = db[db_collection]
  29. papers_data = db['{}_data'.format(db_collection)]
  30. def read_file(filename):
  31. data_list = []
  32. with open(filename, 'r') as f:
  33. for line in f:
  34. line_dict = json.loads(line)
  35. data_list.append(line_dict)
  36. # 在这里可以对每个字典对象进行操作,例如:
  37. # print(data_dict['key'])
  38. return data_list
  39. def add_paper(file_path):
  40. papers.create_index("corpusid", unique=True)
  41. # 读取 paper 文件,存入数据库
  42. # data_list = read_file(file_path)
  43. # 批量插入数据
  44. inserted_ids = 0
  45. try:
  46. sub_list = []
  47. with open(file_path, 'r') as f:
  48. for line in f:
  49. line_dict = json.loads(line)
  50. sub_list.append(line_dict)
  51. if len(sub_list) == 2000:
  52. result = papers.insert_many(sub_list, ordered=False)
  53. inserted_ids += len(result.inserted_ids)
  54. sub_list = []
  55. print('-------process', inserted_ids, '/', '7318795')
  56. if sub_list:
  57. result = papers.insert_many(sub_list, ordered=False)
  58. inserted_ids += len(result.inserted_ids)
  59. sub_list = []
  60. except pymongo.errors.BulkWriteError as e:
  61. inserted_ids = e.details['nInserted']
  62. finally:
  63. # 输出插入结果
  64. print("总插入数据: {0}, 已插入数据: {1}, 已存在数据: {2}" .format(
  65. 7318795, inserted_ids, papers.count_documents({})))
  66. def crawl_data():
  67. # papers_data.create_index("corpusId", unique=True)
  68. # 创建任务队列和线程
  69. q = Queue(TASK_QUEUE_LEN)
  70. # num_threads = 4
  71. threads = []
  72. for i in range(NUM_THREADS):
  73. t = Thread(target=worker, args=(q,))
  74. t.daemon = True
  75. t.start()
  76. print("starting worker: {}".format(t.native_id))
  77. threads.append(t)
  78. # 从数据库中读取 URL,加入任务队列
  79. while True:
  80. try:
  81. for data in papers.find({'$or': [{'consumed': {'$exists': False}}, {'consumed': False}]}):
  82. if quit_flag:
  83. break
  84. if 'consumed' in data and data['consumed']:
  85. print(data['corpusid'], "already inserted")
  86. continue
  87. print('add {} to the task queue'.format(data['corpusid']))
  88. q.put((data['url'], data['corpusid']))
  89. break
  90. except Exception as e:
  91. print('crawl_data error', e)
  92. continue
  93. #
  94. print("Waitting for the task queue to complete...")
  95. q.join()
  96. print("The task queue has been completed!")
  97. # 停止线程
  98. for i in range(NUM_THREADS):
  99. q.put(None)
  100. for t in threads:
  101. print("stoping worker: {}" . format(t.native_id))
  102. t.join()
  103. def mark_data_as_consumed(corpus_id):
  104. result = papers.update_one({'corpusid': corpus_id}, {
  105. '$set': {'consumed': True}})
  106. def worker(q):
  107. while True:
  108. item = q.get()
  109. if item is None:
  110. break
  111. url = urlparse(item[0]).path
  112. paper_id = url.split('/')[-1]
  113. corpus_id = item[1]
  114. print('crawling {} data: {}'.format(corpus_id, url))
  115. try:
  116. data = fetch_data(paper_id)
  117. if data is not None:
  118. # papers_data.insert_one(data)
  119. filter = {'corpusId': corpus_id}
  120. update = {'$set': data}
  121. result = papers_data.update_one(filter, update, upsert=True)
  122. mark_data_as_consumed(corpus_id)
  123. print(result.upserted_id, "inserted successfully")
  124. except Exception as error:
  125. # handle the exception
  126. print("An exception occurred:", error)
  127. finally:
  128. q.task_done()
  129. def get_paper(paper_id):
  130. rsp = requests.get(f'https://api.semanticscholar.org/graph/v1/paper/{paper_id}',
  131. headers={'x-api-key': S2_API_KEY},
  132. params={'fields': QUERY_FIELDS1})
  133. rsp.raise_for_status()
  134. return rsp.json()
  135. def get_citations(paper_id):
  136. edges = get_citation_edges(url=f'https://api.semanticscholar.org/graph/v1/paper/{paper_id}/citations',
  137. headers={'x-api-key': S2_API_KEY},
  138. params={'fields': QUERY_FIELDS2})
  139. return list(edge['citingPaper'] for edge in edges)
  140. def get_references(paper_id):
  141. edges = get_citation_edges(url=f'https://api.semanticscholar.org/graph/v1/paper/{paper_id}/references',
  142. headers={'x-api-key': S2_API_KEY},
  143. params={'fields': QUERY_FIELDS2})
  144. return list(edge['citedPaper'] for edge in edges)
  145. # 接口存在人机验证
  146. def get_related_papers(paper_id):
  147. rsp = requests.get(url=f'https://www.semanticscholar.org/api/1/paper/{paper_id}/related-papers?limit=10&recommenderType=relatedPapers',
  148. headers={'x-api-key': S2_API_KEY},
  149. params={'fields': QUERY_FIELDS3})
  150. rsp.raise_for_status()
  151. return rsp.json()['papers']
  152. def get_recommended_papers(paper_id):
  153. rsp = requests.get(url=f'https://api.semanticscholar.org/recommendations/v1/papers/forpaper/{paper_id}',
  154. headers={'x-api-key': S2_API_KEY},
  155. params={'fields': QUERY_FIELDS2})
  156. rsp.raise_for_status()
  157. return rsp.json()['recommendedPapers']
  158. def get_citation_edges(**req_kwargs):
  159. """This helps with API endpoints that involve paging."""
  160. page_size = 1000
  161. offset = 0
  162. while True:
  163. req_kwargs.setdefault('params', dict())
  164. req_kwargs['params']['limit'] = page_size
  165. req_kwargs['params']['offset'] = offset
  166. rsp = requests.get(**req_kwargs)
  167. rsp.raise_for_status()
  168. page = rsp.json()["data"]
  169. for element in page:
  170. yield element
  171. if len(page) < page_size:
  172. break # no more pages
  173. offset += page_size
  174. def fetch_data(paper_id):
  175. print("fetching data:", paper_id)
  176. data = get_paper(paper_id)
  177. # print(paper)
  178. data['citations'] = get_citations(paper_id)
  179. data['references'] = get_references(paper_id)
  180. data['recommendedPapers'] = get_recommended_papers(paper_id)
  181. print('>>> fetch data OK, citations: {0}, references: {1}, recommendedPapers: {2}'.format(
  182. len(data.get('citations', [])), len(data.get('references', [])), len(data.get('recommendedPapers', []))
  183. ))
  184. return data if isinstance(data, dict) else None
  185. def onSigInt(signo, frame):
  186. global quit_flag
  187. quit_flag = True
  188. print('Ctrl C: Waiting for the process to exit...')
  189. if __name__ == "__main__":
  190. # 主进程退出信号
  191. signal.signal(signal.SIGINT, onSigInt)
  192. parser = argparse.ArgumentParser(description="Crawl data from URLs")
  193. parser.add_argument(
  194. "command", choices=["add_paper", "crawl_data"], help="Command to execute"
  195. )
  196. parser.add_argument("--path", help="Path to add to papers")
  197. args = parser.parse_args()
  198. if args.command == "add_paper":
  199. add_paper(args.path)
  200. elif args.command == "crawl_data":
  201. crawl_data()