123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
- # For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt
- """Use the Appveyor API to download Windows artifacts."""
- import os
- import os.path
- import sys
- import zipfile
- import requests
- def make_auth_headers():
- """Make the authentication headers needed to use the Appveyor API."""
- with open("ci/appveyor.token") as f:
- token = f.read().strip()
- headers = {
- 'Authorization': 'Bearer {0}'.format(token),
- }
- return headers
- def make_url(url, **kwargs):
- """Build an Appveyor API url."""
- return "https://ci.appveyor.com/api" + url.format(**kwargs)
- def get_project_build(account_project):
- """Get the details of the latest Appveyor build."""
- url = make_url("/projects/{account_project}", account_project=account_project)
- response = requests.get(url, headers=make_auth_headers())
- return response.json()
- def download_latest_artifacts(account_project):
- """Download all the artifacts from the latest build."""
- build = get_project_build(account_project)
- jobs = build['build']['jobs']
- print "Build {0[build][version]}, {1} jobs: {0[build][message]}".format(build, len(jobs))
- for job in jobs:
- name = job['name'].partition(':')[2].split(',')[0].strip()
- print " {0}: {1[status]}, {1[artifactsCount]} artifacts".format(name, job)
- url = make_url("/buildjobs/{jobid}/artifacts", jobid=job['jobId'])
- response = requests.get(url, headers=make_auth_headers())
- artifacts = response.json()
- for artifact in artifacts:
- is_zip = artifact['type'] == "Zip"
- filename = artifact['fileName']
- print " {0}, {1} bytes".format(filename, artifact['size'])
- url = make_url(
- "/buildjobs/{jobid}/artifacts/{filename}",
- jobid=job['jobId'],
- filename=filename
- )
- download_url(url, filename, make_auth_headers())
- if is_zip:
- unpack_zipfile(filename)
- os.remove(filename)
- def ensure_dirs(filename):
- """Make sure the directories exist for `filename`."""
- dirname, _ = os.path.split(filename)
- if dirname and not os.path.exists(dirname):
- os.makedirs(dirname)
- def download_url(url, filename, headers):
- """Download a file from `url` to `filename`."""
- ensure_dirs(filename)
- response = requests.get(url, headers=headers, stream=True)
- if response.status_code == 200:
- with open(filename, 'wb') as f:
- for chunk in response.iter_content(16*1024):
- f.write(chunk)
- def unpack_zipfile(filename):
- """Unpack a zipfile, using the names in the zip."""
- with open(filename, 'rb') as fzip:
- z = zipfile.ZipFile(fzip)
- for name in z.namelist():
- print " extracting {0}".format(name)
- ensure_dirs(name)
- z.extract(name)
- if __name__ == "__main__":
- download_latest_artifacts(sys.argv[1])
|