123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- import json
- from searx import logger
- from searx.poolrequests import get
- from searx.utils import format_date_by_locale
- from datetime import datetime
- from dateutil.parser import parse as dateutil_parse
- from urllib import urlencode
- logger = logger.getChild('wikidata')
- result_count = 1
- wikidata_host = 'https://www.wikidata.org'
- wikidata_api = wikidata_host + '/w/api.php'
- url_search = wikidata_api \
- + '?action=query&list=search&format=json'\
- + '&srnamespace=0&srprop=sectiontitle&{query}'
- url_detail = wikidata_api\
- + '?action=wbgetentities&format=json'\
- + '&props=labels%7Cinfo%7Csitelinks'\
- + '%7Csitelinks%2Furls%7Cdescriptions%7Cclaims'\
- + '&{query}'
- url_map = 'https://www.openstreetmap.org/'\
- + '?lat={latitude}&lon={longitude}&zoom={zoom}&layers=M'
- def request(query, params):
- params['url'] = url_search.format(
- query=urlencode({'srsearch': query,
- 'srlimit': result_count}))
- return params
- def response(resp):
- results = []
- search_res = json.loads(resp.text)
- wikidata_ids = set()
- for r in search_res.get('query', {}).get('search', {}):
- wikidata_ids.add(r.get('title', ''))
- language = resp.search_params['language'].split('_')[0]
- if language == 'all':
- language = 'en'
- url = url_detail.format(query=urlencode({'ids': '|'.join(wikidata_ids),
- 'languages': language + '|en'}))
- htmlresponse = get(url)
- jsonresponse = json.loads(htmlresponse.content)
- for wikidata_id in wikidata_ids:
- results = results + getDetail(jsonresponse, wikidata_id, language, resp.search_params['language'])
- return results
- def getDetail(jsonresponse, wikidata_id, language, locale):
- results = []
- urls = []
- attributes = []
- result = jsonresponse.get('entities', {}).get(wikidata_id, {})
- title = result.get('labels', {}).get(language, {}).get('value', None)
- if title is None:
- title = result.get('labels', {}).get('en', {}).get('value', None)
- if title is None:
- return results
- description = result\
- .get('descriptions', {})\
- .get(language, {})\
- .get('value', None)
- if description is None:
- description = result\
- .get('descriptions', {})\
- .get('en', {})\
- .get('value', '')
- claims = result.get('claims', {})
- official_website = get_string(claims, 'P856', None)
- if official_website is not None:
- urls.append({'title': 'Official site', 'url': official_website})
- results.append({'title': title, 'url': official_website})
- wikipedia_link_count = 0
- if language != 'en':
- wikipedia_link_count += add_url(urls,
- 'Wikipedia (' + language + ')',
- get_wikilink(result, language +
- 'wiki'))
- wikipedia_en_link = get_wikilink(result, 'enwiki')
- wikipedia_link_count += add_url(urls,
- 'Wikipedia (en)',
- wikipedia_en_link)
- if wikipedia_link_count == 0:
- misc_language = get_wiki_firstlanguage(result, 'wiki')
- if misc_language is not None:
- add_url(urls,
- 'Wikipedia (' + misc_language + ')',
- get_wikilink(result, misc_language + 'wiki'))
- if language != 'en':
- add_url(urls,
- 'Wiki voyage (' + language + ')',
- get_wikilink(result, language + 'wikivoyage'))
- add_url(urls,
- 'Wiki voyage (en)',
- get_wikilink(result, 'enwikivoyage'))
- if language != 'en':
- add_url(urls,
- 'Wikiquote (' + language + ')',
- get_wikilink(result, language + 'wikiquote'))
- add_url(urls,
- 'Wikiquote (en)',
- get_wikilink(result, 'enwikiquote'))
- add_url(urls,
- 'Commons wiki',
- get_wikilink(result, 'commonswiki'))
- add_url(urls,
- 'Location',
- get_geolink(claims, 'P625', None))
- add_url(urls,
- 'Wikidata',
- 'https://www.wikidata.org/wiki/'
- + wikidata_id + '?uselang=' + language)
- musicbrainz_work_id = get_string(claims, 'P435')
- if musicbrainz_work_id is not None:
- add_url(urls,
- 'MusicBrainz',
- 'http://musicbrainz.org/work/'
- + musicbrainz_work_id)
- musicbrainz_artist_id = get_string(claims, 'P434')
- if musicbrainz_artist_id is not None:
- add_url(urls,
- 'MusicBrainz',
- 'http://musicbrainz.org/artist/'
- + musicbrainz_artist_id)
- musicbrainz_release_group_id = get_string(claims, 'P436')
- if musicbrainz_release_group_id is not None:
- add_url(urls,
- 'MusicBrainz',
- 'http://musicbrainz.org/release-group/'
- + musicbrainz_release_group_id)
- musicbrainz_label_id = get_string(claims, 'P966')
- if musicbrainz_label_id is not None:
- add_url(urls,
- 'MusicBrainz',
- 'http://musicbrainz.org/label/'
- + musicbrainz_label_id)
- # musicbrainz_area_id = get_string(claims, 'P982')
- # P1407 MusicBrainz series ID
- # P1004 MusicBrainz place ID
- # P1330 MusicBrainz instrument ID
- # P1407 MusicBrainz series ID
- postal_code = get_string(claims, 'P281', None)
- if postal_code is not None:
- attributes.append({'label': 'Postal code(s)', 'value': postal_code})
- date_of_birth = get_time(claims, 'P569', locale, None)
- if date_of_birth is not None:
- attributes.append({'label': 'Date of birth', 'value': date_of_birth})
- date_of_death = get_time(claims, 'P570', locale, None)
- if date_of_death is not None:
- attributes.append({'label': 'Date of death', 'value': date_of_death})
- if len(attributes) == 0 and len(urls) == 2 and len(description) == 0:
- results.append({
- 'url': urls[0]['url'],
- 'title': title,
- 'content': description
- })
- else:
- results.append({
- 'infobox': title,
- 'id': wikipedia_en_link,
- 'content': description,
- 'attributes': attributes,
- 'urls': urls
- })
- return results
- def add_url(urls, title, url):
- if url is not None:
- urls.append({'title': title, 'url': url})
- return 1
- else:
- return 0
- def get_mainsnak(claims, propertyName):
- propValue = claims.get(propertyName, {})
- if len(propValue) == 0:
- return None
- propValue = propValue[0].get('mainsnak', None)
- return propValue
- def get_string(claims, propertyName, defaultValue=None):
- propValue = claims.get(propertyName, {})
- if len(propValue) == 0:
- return defaultValue
- result = []
- for e in propValue:
- mainsnak = e.get('mainsnak', {})
- datavalue = mainsnak.get('datavalue', {})
- if datavalue is not None:
- result.append(datavalue.get('value', ''))
- if len(result) == 0:
- return defaultValue
- else:
- # TODO handle multiple urls
- return result[0]
- def get_time(claims, propertyName, locale, defaultValue=None):
- propValue = claims.get(propertyName, {})
- if len(propValue) == 0:
- return defaultValue
- result = []
- for e in propValue:
- mainsnak = e.get('mainsnak', {})
- datavalue = mainsnak.get('datavalue', {})
- if datavalue is not None:
- value = datavalue.get('value', '')
- result.append(value.get('time', ''))
- if len(result) == 0:
- date_string = defaultValue
- else:
- date_string = ', '.join(result)
- try:
- parsed_date = datetime.strptime(date_string, "+%Y-%m-%dT%H:%M:%SZ")
- except:
- if date_string.startswith('-'):
- return date_string.split('T')[0]
- try:
- parsed_date = dateutil_parse(date_string, fuzzy=False, default=False)
- except:
- logger.debug('could not parse date %s', date_string)
- return date_string.split('T')[0]
- return format_date_by_locale(parsed_date, locale)
- def get_geolink(claims, propertyName, defaultValue=''):
- mainsnak = get_mainsnak(claims, propertyName)
- if mainsnak is None:
- return defaultValue
- datatype = mainsnak.get('datatype', '')
- datavalue = mainsnak.get('datavalue', {})
- if datatype != 'globe-coordinate':
- return defaultValue
- value = datavalue.get('value', {})
- precision = value.get('precision', 0.0002)
- # there is no zoom information, deduce from precision (error prone)
- # samples :
- # 13 --> 5
- # 1 --> 6
- # 0.016666666666667 --> 9
- # 0.00027777777777778 --> 19
- # wolframalpha :
- # quadratic fit { {13, 5}, {1, 6}, {0.0166666, 9}, {0.0002777777,19}}
- # 14.1186-8.8322 x+0.625447 x^2
- if precision < 0.0003:
- zoom = 19
- else:
- zoom = int(15 - precision * 8.8322 + precision * precision * 0.625447)
- url = url_map\
- .replace('{latitude}', str(value.get('latitude', 0)))\
- .replace('{longitude}', str(value.get('longitude', 0)))\
- .replace('{zoom}', str(zoom))
- return url
- def get_wikilink(result, wikiid):
- url = result.get('sitelinks', {}).get(wikiid, {}).get('url', None)
- if url is None:
- return url
- elif url.startswith('http://'):
- url = url.replace('http://', 'https://')
- elif url.startswith('//'):
- url = 'https:' + url
- return url
- def get_wiki_firstlanguage(result, wikipatternid):
- for k in result.get('sitelinks', {}).keys():
- if k.endswith(wikipatternid) and len(k) == (2 + len(wikipatternid)):
- return k[0:2]
- return None
|