diff options
author | Haibo Huang <hhb@google.com> | 2019-10-03 14:13:34 -0700 |
---|---|---|
committer | android-build-merger <android-build-merger@google.com> | 2019-10-03 14:13:34 -0700 |
commit | c254d57dfee5d09652e0a4b3f487c7a8e9684269 (patch) | |
tree | d4f1b19f018cbc3d55a4662bd96cb5de12364101 | |
parent | d31d86984ca113bc7af9c1703487041695506b9e (diff) | |
parent | 09b30c4fd80951644ca8c31b00dfcffe14965d9f (diff) | |
download | platform_external_python_asn1crypto-c254d57dfee5d09652e0a4b3f487c7a8e9684269.tar.gz platform_external_python_asn1crypto-c254d57dfee5d09652e0a4b3f487c7a8e9684269.tar.bz2 platform_external_python_asn1crypto-c254d57dfee5d09652e0a4b3f487c7a8e9684269.zip |
Upgrade python/asn1crypto to 1.0.0 am: bbe5f36e27 am: 9f54e18bda am: f4b1d844bc
am: 09b30c4fd8
Change-Id: I9cd0710bd5601152d8c30dfe5a16d1e282523bdb
67 files changed, 4464 insertions, 1789 deletions
diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..500e4d6 --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,27 @@ +version: 2 +jobs: + py26: + macos: + # macOS 10.12, last version with Python 2.6 + xcode: 9.2.0 + steps: + - checkout + - run: /usr/bin/python2.6 run.py deps + - run: /usr/bin/python2.6 run.py ci + pypy: + macos: + # macOS 10.14.4 + xcode: 10.3.0 + steps: + - checkout + - run: brew install pypy + - run: pypy run.py deps + - run: pypy run.py ci +workflows: + version: 2 + python-26: + jobs: + - py26 + python-pypy: + jobs: + - pypy diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..ab5738c --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,44 @@ +name: CI +on: [push] + +jobs: + build: + name: Python ${{ matrix.python }} on ${{ matrix.os }} ${{ matrix.arch }} + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: + - ubuntu-18.04 + - macOS-10.14 + - windows-2019 + python: + - '2.7' + - '3.7' + arch: + - 'x86' + - 'x64' + exclude: + - os: ubuntu-18.04 + arch: x86 + - os: macOS-10.14 + arch: x86 + steps: + - uses: actions/checkout@master + - uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python }} + architecture: ${{ matrix.arch }} + - name: Install dependencies + run: python run.py deps + - name: Run test suite + run: python run.py ci + - name: Run test suite (OpenSSL/macOS) + run: python run.py ci + if: runner.os == 'macOS' + env: + OSCRYPTO_USE_OPENSSL: /usr/lib/libcrypto.dylib,/usr/lib/libssl.dylib + - name: Run test suite (Windows legacy API) + run: python run.py ci + if: runner.os == 'Windows' + env: + OSCRYPTO_USE_WINLEGACY: 'true' @@ -6,6 +6,7 @@ tests/output/ tmp/ *.egg-info/ *.pyc +*.pyo .python-version .DS_Store .coverage diff --git a/.travis.yml b/.travis.yml index ef69009..3c5b0d6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,23 +6,29 @@ branches: matrix: include: - os: linux + dist: trusty language: python python: "2.6" - os: linux + dist: bionic language: python python: "2.7" - os: linux + dist: trusty language: python python: "3.2" - os: linux + dist: trusty language: python python: "3.3" - os: linux + dist: bionic language: python - python: "3.6" + python: "3.7" - os: linux + dist: xenial language: python - python: "pypy-5.3.1" + python: "pypy" script: - python run.py deps - python run.py ci @@ -1,4 +1,4 @@ -Copyright (c) 2015-2017 Will Bond <will@wbond.net> +Copyright (c) 2015-2019 Will Bond <will@wbond.net> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index 40e672e..0000000 --- a/MANIFEST.in +++ /dev/null @@ -1,3 +0,0 @@ -include LICENSE -include readme.md changelog.md -recursive-include docs *.md @@ -1,7 +1,5 @@ name: "asn1crypto" -description: - "A fast, pure Python library for parsing and serializing ASN.1 structures." - +description: "A fast, pure Python library for parsing and serializing ASN.1 structures." third_party { url { type: HOMEPAGE @@ -11,7 +9,11 @@ third_party { type: GIT value: "https://github.com/wbond/asn1crypto" } - version: "0.24.0" - last_upgrade_date { year: 2019 month: 2 day: 26 } + version: "1.0.0" license_type: NOTICE + last_upgrade_date { + year: 2019 + month: 10 + day: 2 + } } diff --git a/appveyor.yml b/appveyor.yml index fdd2dd6..564a665 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,97 +1,34 @@ version: "{build}" skip_tags: true +environment: + matrix: + - PYTHON_EXE: "C:\\Python26\\python.exe" + - PYTHON_EXE: "C:\\Python26-x64\\python.exe" + - PYTHON_EXE: "C:\\Python26-x64\\python.exe" + OSCRYPTO_USE_WINLEGACY: "true" + - PYTHON_EXE: "C:\\Python33\\python.exe" + - PYTHON_EXE: "C:\\Python33\\python.exe" + OSCRYPTO_USE_WINLEGACY: "true" + - PYTHON_EXE: "C:\\Python33-x64\\python.exe" + - PYTHON_EXE: "C:\\pypy2-v5.10.0-win32\\pypy.exe" + - PYTHON_EXE: "C:\\pypy2-v5.10.0-win32\\pypy.exe" + OSCRYPTO_USE_WINLEGACY: "true" install: - ps: |- $env:PYTMP = "${env:TMP}\py"; if (!(Test-Path "$env:PYTMP")) { New-Item -ItemType directory -Path "$env:PYTMP" | Out-Null; } - if (!(Test-Path "${env:PYTMP}\pypy2-v5.7.1-win32.zip")) { - (New-Object Net.WebClient).DownloadFile('https://bitbucket.org/pypy/pypy/downloads/pypy2-v5.7.1-win32.zip', "${env:PYTMP}\pypy2-v5.7.1-win32.zip"); + if ("${env:PYTHON_EXE}" -eq "C:\pypy2-v5.10.0-win32\pypy.exe") { + if (!(Test-Path "${env:PYTMP}\pypy2-v5.10.0-win32.zip")) { + (New-Object Net.WebClient).DownloadFile('https://bitbucket.org/pypy/pypy/downloads/pypy2-v5.10.0-win32.zip', "${env:PYTMP}\pypy2-v5.10.0-win32.zip"); + } + 7z x -y "${env:PYTMP}\pypy2-v5.10.0-win32.zip" -oC:\ | Out-Null; + & ${env:PYTHON_EXE} -m ensurepip --upgrade; } - 7z x -y "${env:PYTMP}\pypy2-v5.7.1-win32.zip" -oC:\ | Out-Null; - - [Byte[]] $geotrustCaBytes = 0x30,0x82,0x03,0x7C,0x30,0x82,0x02,0x64,0xA0,0x03,0x02,0x01,0x02, - 0x02,0x10,0x18,0xAC,0xB5,0x6A,0xFD,0x69,0xB6,0x15,0x3A,0x63,0x6C,0xAF,0xDA,0xFA,0xC4,0xA1,0x30, - 0x0D,0x06,0x09,0x2A,0x86,0x48,0x86,0xF7,0x0D,0x01,0x01,0x05,0x05,0x00,0x30,0x58,0x31,0x0B,0x30, - 0x09,0x06,0x03,0x55,0x04,0x06,0x13,0x02,0x55,0x53,0x31,0x16,0x30,0x14,0x06,0x03,0x55,0x04,0x0A, - 0x13,0x0D,0x47,0x65,0x6F,0x54,0x72,0x75,0x73,0x74,0x20,0x49,0x6E,0x63,0x2E,0x31,0x31,0x30,0x2F, - 0x06,0x03,0x55,0x04,0x03,0x13,0x28,0x47,0x65,0x6F,0x54,0x72,0x75,0x73,0x74,0x20,0x50,0x72,0x69, - 0x6D,0x61,0x72,0x79,0x20,0x43,0x65,0x72,0x74,0x69,0x66,0x69,0x63,0x61,0x74,0x69,0x6F,0x6E,0x20, - 0x41,0x75,0x74,0x68,0x6F,0x72,0x69,0x74,0x79,0x30,0x1E,0x17,0x0D,0x30,0x36,0x31,0x31,0x32,0x37, - 0x30,0x30,0x30,0x30,0x30,0x30,0x5A,0x17,0x0D,0x33,0x36,0x30,0x37,0x31,0x36,0x32,0x33,0x35,0x39, - 0x35,0x39,0x5A,0x30,0x58,0x31,0x0B,0x30,0x09,0x06,0x03,0x55,0x04,0x06,0x13,0x02,0x55,0x53,0x31, - 0x16,0x30,0x14,0x06,0x03,0x55,0x04,0x0A,0x13,0x0D,0x47,0x65,0x6F,0x54,0x72,0x75,0x73,0x74,0x20, - 0x49,0x6E,0x63,0x2E,0x31,0x31,0x30,0x2F,0x06,0x03,0x55,0x04,0x03,0x13,0x28,0x47,0x65,0x6F,0x54, - 0x72,0x75,0x73,0x74,0x20,0x50,0x72,0x69,0x6D,0x61,0x72,0x79,0x20,0x43,0x65,0x72,0x74,0x69,0x66, - 0x69,0x63,0x61,0x74,0x69,0x6F,0x6E,0x20,0x41,0x75,0x74,0x68,0x6F,0x72,0x69,0x74,0x79,0x30,0x82, - 0x01,0x22,0x30,0x0D,0x06,0x09,0x2A,0x86,0x48,0x86,0xF7,0x0D,0x01,0x01,0x01,0x05,0x00,0x03,0x82, - 0x01,0x0F,0x00,0x30,0x82,0x01,0x0A,0x02,0x82,0x01,0x01,0x00,0xBE,0xB8,0x15,0x7B,0xFF,0xD4,0x7C, - 0x7D,0x67,0xAD,0x83,0x64,0x7B,0xC8,0x42,0x53,0x2D,0xDF,0xF6,0x84,0x08,0x20,0x61,0xD6,0x01,0x59, - 0x6A,0x9C,0x44,0x11,0xAF,0xEF,0x76,0xFD,0x95,0x7E,0xCE,0x61,0x30,0xBB,0x7A,0x83,0x5F,0x02,0xBD, - 0x01,0x66,0xCA,0xEE,0x15,0x8D,0x6F,0xA1,0x30,0x9C,0xBD,0xA1,0x85,0x9E,0x94,0x3A,0xF3,0x56,0x88, - 0x00,0x31,0xCF,0xD8,0xEE,0x6A,0x96,0x02,0xD9,0xED,0x03,0x8C,0xFB,0x75,0x6D,0xE7,0xEA,0xB8,0x55, - 0x16,0x05,0x16,0x9A,0xF4,0xE0,0x5E,0xB1,0x88,0xC0,0x64,0x85,0x5C,0x15,0x4D,0x88,0xC7,0xB7,0xBA, - 0xE0,0x75,0xE9,0xAD,0x05,0x3D,0x9D,0xC7,0x89,0x48,0xE0,0xBB,0x28,0xC8,0x03,0xE1,0x30,0x93,0x64, - 0x5E,0x52,0xC0,0x59,0x70,0x22,0x35,0x57,0x88,0x8A,0xF1,0x95,0x0A,0x83,0xD7,0xBC,0x31,0x73,0x01, - 0x34,0xED,0xEF,0x46,0x71,0xE0,0x6B,0x02,0xA8,0x35,0x72,0x6B,0x97,0x9B,0x66,0xE0,0xCB,0x1C,0x79, - 0x5F,0xD8,0x1A,0x04,0x68,0x1E,0x47,0x02,0xE6,0x9D,0x60,0xE2,0x36,0x97,0x01,0xDF,0xCE,0x35,0x92, - 0xDF,0xBE,0x67,0xC7,0x6D,0x77,0x59,0x3B,0x8F,0x9D,0xD6,0x90,0x15,0x94,0xBC,0x42,0x34,0x10,0xC1, - 0x39,0xF9,0xB1,0x27,0x3E,0x7E,0xD6,0x8A,0x75,0xC5,0xB2,0xAF,0x96,0xD3,0xA2,0xDE,0x9B,0xE4,0x98, - 0xBE,0x7D,0xE1,0xE9,0x81,0xAD,0xB6,0x6F,0xFC,0xD7,0x0E,0xDA,0xE0,0x34,0xB0,0x0D,0x1A,0x77,0xE7, - 0xE3,0x08,0x98,0xEF,0x58,0xFA,0x9C,0x84,0xB7,0x36,0xAF,0xC2,0xDF,0xAC,0xD2,0xF4,0x10,0x06,0x70, - 0x71,0x35,0x02,0x03,0x01,0x00,0x01,0xA3,0x42,0x30,0x40,0x30,0x0F,0x06,0x03,0x55,0x1D,0x13,0x01, - 0x01,0xFF,0x04,0x05,0x30,0x03,0x01,0x01,0xFF,0x30,0x0E,0x06,0x03,0x55,0x1D,0x0F,0x01,0x01,0xFF, - 0x04,0x04,0x03,0x02,0x01,0x06,0x30,0x1D,0x06,0x03,0x55,0x1D,0x0E,0x04,0x16,0x04,0x14,0x2C,0xD5, - 0x50,0x41,0x97,0x15,0x8B,0xF0,0x8F,0x36,0x61,0x5B,0x4A,0xFB,0x6B,0xD9,0x99,0xC9,0x33,0x92,0x30, - 0x0D,0x06,0x09,0x2A,0x86,0x48,0x86,0xF7,0x0D,0x01,0x01,0x05,0x05,0x00,0x03,0x82,0x01,0x01,0x00, - 0x5A,0x70,0x7F,0x2C,0xDD,0xB7,0x34,0x4F,0xF5,0x86,0x51,0xA9,0x26,0xBE,0x4B,0xB8,0xAA,0xF1,0x71, - 0x0D,0xDC,0x61,0xC7,0xA0,0xEA,0x34,0x1E,0x7A,0x77,0x0F,0x04,0x35,0xE8,0x27,0x8F,0x6C,0x90,0xBF, - 0x91,0x16,0x24,0x46,0x3E,0x4A,0x4E,0xCE,0x2B,0x16,0xD5,0x0B,0x52,0x1D,0xFC,0x1F,0x67,0xA2,0x02, - 0x45,0x31,0x4F,0xCE,0xF3,0xFA,0x03,0xA7,0x79,0x9D,0x53,0x6A,0xD9,0xDA,0x63,0x3A,0xF8,0x80,0xD7, - 0xD3,0x99,0xE1,0xA5,0xE1,0xBE,0xD4,0x55,0x71,0x98,0x35,0x3A,0xBE,0x93,0xEA,0xAE,0xAD,0x42,0xB2, - 0x90,0x6F,0xE0,0xFC,0x21,0x4D,0x35,0x63,0x33,0x89,0x49,0xD6,0x9B,0x4E,0xCA,0xC7,0xE7,0x4E,0x09, - 0x00,0xF7,0xDA,0xC7,0xEF,0x99,0x62,0x99,0x77,0xB6,0x95,0x22,0x5E,0x8A,0xA0,0xAB,0xF4,0xB8,0x78, - 0x98,0xCA,0x38,0x19,0x99,0xC9,0x72,0x9E,0x78,0xCD,0x4B,0xAC,0xAF,0x19,0xA0,0x73,0x12,0x2D,0xFC, - 0xC2,0x41,0xBA,0x81,0x91,0xDA,0x16,0x5A,0x31,0xB7,0xF9,0xB4,0x71,0x80,0x12,0x48,0x99,0x72,0x73, - 0x5A,0x59,0x53,0xC1,0x63,0x52,0x33,0xED,0xA7,0xC9,0xD2,0x39,0x02,0x70,0xFA,0xE0,0xB1,0x42,0x66, - 0x29,0xAA,0x9B,0x51,0xED,0x30,0x54,0x22,0x14,0x5F,0xD9,0xAB,0x1D,0xC1,0xE4,0x94,0xF0,0xF8,0xF5, - 0x2B,0xF7,0xEA,0xCA,0x78,0x46,0xD6,0xB8,0x91,0xFD,0xA6,0x0D,0x2B,0x1A,0x14,0x01,0x3E,0x80,0xF0, - 0x42,0xA0,0x95,0x07,0x5E,0x6D,0xCD,0xCC,0x4B,0xA4,0x45,0x8D,0xAB,0x12,0xE8,0xB3,0xDE,0x5A,0xE5, - 0xA0,0x7C,0xE8,0x0F,0x22,0x1D,0x5A,0xE9,0x59; - $geotrustCa = New-Object System.Security.Cryptography.X509Certificates.X509Certificate2; - $geotrustCa.Import($geotrustCaBytes); - $rootStore = Get-Item cert:\LocalMachine\Root; - $rootStore.Open("ReadWrite"); - $rootStore.Add($geotrustCa); - $rootStore.Close(); cache: - '%TMP%\py\' build: off test_script: - - ps: '& C:\Python26\python run.py deps' - - ps: '& C:\Python26\python run.py ci' - - ps: '& C:\Python26-x64\python run.py deps' - - ps: '& C:\Python26-x64\python run.py ci' - - ps: '& C:\Python27\python run.py deps' - - ps: '& C:\Python27\python run.py ci' - - ps: > - $env:OSCRYPTO_USE_WINLEGACY = "true"; - & C:\Python27\python run.py ci; - remove-item env:\OSCRYPTO_USE_WINLEGACY; - - ps: '& C:\Python27-x64\python run.py deps' - - ps: '& C:\Python27-x64\python run.py ci' - - ps: '& C:\Python33\python run.py deps' - - ps: '& C:\Python33\python run.py ci' - - ps: > - $env:OSCRYPTO_USE_WINLEGACY = "true"; - & C:\Python33\python run.py ci; - remove-item env:\OSCRYPTO_USE_WINLEGACY; - - ps: '& C:\Python33-x64\python run.py deps' - - ps: '& C:\Python33-x64\python run.py ci' - - ps: '& C:\pypy2-v5.7.1-win32\pypy run.py deps' - - ps: '& C:\pypy2-v5.7.1-win32\pypy run.py ci' - - ps: > - $env:OSCRYPTO_USE_WINLEGACY = "true"; - & C:\pypy2-v5.7.1-win32\pypy run.py ci; - remove-item env:\OSCRYPTO_USE_WINLEGACY; + - cmd: "%PYTHON_EXE% run.py deps" + - cmd: "%PYTHON_EXE% run.py ci" diff --git a/asn1crypto/_elliptic_curve.py b/asn1crypto/_elliptic_curve.py deleted file mode 100644 index 8c0f12d..0000000 --- a/asn1crypto/_elliptic_curve.py +++ /dev/null @@ -1,314 +0,0 @@ -# coding: utf-8 - -""" -Classes and objects to represent prime-field elliptic curves and points on them. -Exports the following items: - - - PrimeCurve() - - PrimePoint() - - SECP192R1_CURVE - - SECP192R1_BASE_POINT - - SECP224R1_CURVE - - SECP224R1_BASE_POINT - - SECP256R1_CURVE - - SECP256R1_BASE_POINT - - SECP384R1_CURVE - - SECP384R1_BASE_POINT - - SECP521R1_CURVE - - SECP521R1_BASE_POINT - -The curve constants are all PrimeCurve() objects and the base point constants -are all PrimePoint() objects. - -Some of the following source code is derived from -http://webpages.charter.net/curryfans/peter/downloads.html, but has been heavily -modified to fit into this projects lint settings. The original project license -is listed below: - -Copyright (c) 2014 Peter Pearson - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -""" - -from __future__ import unicode_literals, division, absolute_import, print_function - -from ._int import inverse_mod - - -class PrimeCurve(): - """ - Elliptic curve over a prime field. Characteristic two field curves are not - supported. - """ - - def __init__(self, p, a, b): - """ - The curve of points satisfying y^2 = x^3 + a*x + b (mod p) - - :param p: - The prime number as an integer - - :param a: - The component a as an integer - - :param b: - The component b as an integer - """ - - self.p = p - self.a = a - self.b = b - - def contains(self, point): - """ - :param point: - A Point object - - :return: - Boolean if the point is on this curve - """ - - y2 = point.y * point.y - x3 = point.x * point.x * point.x - return (y2 - (x3 + self.a * point.x + self.b)) % self.p == 0 - - -class PrimePoint(): - """ - A point on a prime-field elliptic curve - """ - - def __init__(self, curve, x, y, order=None): - """ - :param curve: - A PrimeCurve object - - :param x: - The x coordinate of the point as an integer - - :param y: - The y coordinate of the point as an integer - - :param order: - The order of the point, as an integer - optional - """ - - self.curve = curve - self.x = x - self.y = y - self.order = order - - # self.curve is allowed to be None only for INFINITY: - if self.curve: - if not self.curve.contains(self): - raise ValueError('Invalid EC point') - - if self.order: - if self * self.order != INFINITY: - raise ValueError('Invalid EC point') - - def __cmp__(self, other): - """ - :param other: - A PrimePoint object - - :return: - 0 if identical, 1 otherwise - """ - if self.curve == other.curve and self.x == other.x and self.y == other.y: - return 0 - else: - return 1 - - def __add__(self, other): - """ - :param other: - A PrimePoint object - - :return: - A PrimePoint object - """ - - # X9.62 B.3: - - if other == INFINITY: - return self - if self == INFINITY: - return other - assert self.curve == other.curve - if self.x == other.x: - if (self.y + other.y) % self.curve.p == 0: - return INFINITY - else: - return self.double() - - p = self.curve.p - - l_ = ((other.y - self.y) * inverse_mod(other.x - self.x, p)) % p - - x3 = (l_ * l_ - self.x - other.x) % p - y3 = (l_ * (self.x - x3) - self.y) % p - - return PrimePoint(self.curve, x3, y3) - - def __mul__(self, other): - """ - :param other: - An integer to multiple the Point by - - :return: - A PrimePoint object - """ - - def leftmost_bit(x): - assert x > 0 - result = 1 - while result <= x: - result = 2 * result - return result // 2 - - e = other - if self.order: - e = e % self.order - if e == 0: - return INFINITY - if self == INFINITY: - return INFINITY - assert e > 0 - - # From X9.62 D.3.2: - - e3 = 3 * e - negative_self = PrimePoint(self.curve, self.x, -self.y, self.order) - i = leftmost_bit(e3) // 2 - result = self - # print "Multiplying %s by %d (e3 = %d):" % ( self, other, e3 ) - while i > 1: - result = result.double() - if (e3 & i) != 0 and (e & i) == 0: - result = result + self - if (e3 & i) == 0 and (e & i) != 0: - result = result + negative_self - # print ". . . i = %d, result = %s" % ( i, result ) - i = i // 2 - - return result - - def __rmul__(self, other): - """ - :param other: - An integer to multiple the Point by - - :return: - A PrimePoint object - """ - - return self * other - - def double(self): - """ - :return: - A PrimePoint object that is twice this point - """ - - # X9.62 B.3: - - p = self.curve.p - a = self.curve.a - - l_ = ((3 * self.x * self.x + a) * inverse_mod(2 * self.y, p)) % p - - x3 = (l_ * l_ - 2 * self.x) % p - y3 = (l_ * (self.x - x3) - self.y) % p - - return PrimePoint(self.curve, x3, y3) - - -# This one point is the Point At Infinity for all purposes: -INFINITY = PrimePoint(None, None, None) - - -# NIST Curve P-192: -SECP192R1_CURVE = PrimeCurve( - 6277101735386680763835789423207666416083908700390324961279, - -3, - 0x64210519e59c80e70fa7e9ab72243049feb8deecc146b9b1 -) -SECP192R1_BASE_POINT = PrimePoint( - SECP192R1_CURVE, - 0x188da80eb03090f67cbf20eb43a18800f4ff0afd82ff1012, - 0x07192b95ffc8da78631011ed6b24cdd573f977a11e794811, - 6277101735386680763835789423176059013767194773182842284081 -) - - -# NIST Curve P-224: -SECP224R1_CURVE = PrimeCurve( - 26959946667150639794667015087019630673557916260026308143510066298881, - -3, - 0xb4050a850c04b3abf54132565044b0b7d7bfd8ba270b39432355ffb4 -) -SECP224R1_BASE_POINT = PrimePoint( - SECP224R1_CURVE, - 0xb70e0cbd6bb4bf7f321390b94a03c1d356c21122343280d6115c1d21, - 0xbd376388b5f723fb4c22dfe6cd4375a05a07476444d5819985007e34, - 26959946667150639794667015087019625940457807714424391721682722368061 -) - - -# NIST Curve P-256: -SECP256R1_CURVE = PrimeCurve( - 115792089210356248762697446949407573530086143415290314195533631308867097853951, - -3, - 0x5ac635d8aa3a93e7b3ebbd55769886bc651d06b0cc53b0f63bce3c3e27d2604b -) -SECP256R1_BASE_POINT = PrimePoint( - SECP256R1_CURVE, - 0x6b17d1f2e12c4247f8bce6e563a440f277037d812deb33a0f4a13945d898c296, - 0x4fe342e2fe1a7f9b8ee7eb4a7c0f9e162bce33576b315ececbb6406837bf51f5, - 115792089210356248762697446949407573529996955224135760342422259061068512044369 -) - - -# NIST Curve P-384: -SECP384R1_CURVE = PrimeCurve( - 39402006196394479212279040100143613805079739270465446667948293404245721771496870329047266088258938001861606973112319, # noqa - -3, - 0xb3312fa7e23ee7e4988e056be3f82d19181d9c6efe8141120314088f5013875ac656398d8a2ed19d2a85c8edd3ec2aef -) -SECP384R1_BASE_POINT = PrimePoint( - SECP384R1_CURVE, - 0xaa87ca22be8b05378eb1c71ef320ad746e1d3b628ba79b9859f741e082542a385502f25dbf55296c3a545e3872760ab7, - 0x3617de4a96262c6f5d9e98bf9292dc29f8f41dbd289a147ce9da3113b5f0b8c00a60b1ce1d7e819d7a431d7c90ea0e5f, - 39402006196394479212279040100143613805079739270465446667946905279627659399113263569398956308152294913554433653942643 -) - - -# NIST Curve P-521: -SECP521R1_CURVE = PrimeCurve( - 6864797660130609714981900799081393217269435300143305409394463459185543183397656052122559640661454554977296311391480858037121987999716643812574028291115057151, # noqa - -3, - 0x051953eb9618e1c9a1f929a21a0b68540eea2da725b99b315f3b8b489918ef109e156193951ec7e937b1652c0bd3bb1bf073573df883d2c34f1ef451fd46b503f00 # noqa -) -SECP521R1_BASE_POINT = PrimePoint( - SECP521R1_CURVE, - 0xc6858e06b70404e9cd9e3ecb662395b4429c648139053fb521f828af606b4d3dbaa14b5e77efe75928fe1dc127a2ffa8de3348b3c1856a429bf97e7e31c2e5bd66, # noqa - 0x11839296a789a3bc0045c8a5fb42c7d1bd998f54449579b446817afbd17273e662c97ee72995ef42640c550b9013fad0761353c7086a272c24088be94769fd16650, # noqa - 6864797660130609714981900799081393217269435300143305409394463459185543183397655394245057746333217197532963996371363321113864768612440380340372808892707005449 # noqa -) diff --git a/asn1crypto/_errors.py b/asn1crypto/_errors.py index cc785a5..d8797a2 100644 --- a/asn1crypto/_errors.py +++ b/asn1crypto/_errors.py @@ -1,9 +1,10 @@ # coding: utf-8 """ -Helper for formatting exception messages. Exports the following items: +Exports the following items: - unwrap() + - APIException() """ from __future__ import unicode_literals, division, absolute_import, print_function @@ -12,6 +13,14 @@ import re import textwrap +class APIException(Exception): + """ + An exception indicating an API has been removed from asn1crypto + """ + + pass + + def unwrap(string, *params): """ Takes a multi-line string and does the following: diff --git a/asn1crypto/_ffi.py b/asn1crypto/_ffi.py deleted file mode 100644 index 2a4f5bf..0000000 --- a/asn1crypto/_ffi.py +++ /dev/null @@ -1,45 +0,0 @@ -# coding: utf-8 - -""" -FFI helper compatibility functions. Exports the following items: - - - LibraryNotFoundError - - FFIEngineError - - bytes_from_buffer() - - buffer_from_bytes() - - null() -""" - -from __future__ import unicode_literals, division, absolute_import, print_function - -from ctypes import create_string_buffer - - -def buffer_from_bytes(initializer): - return create_string_buffer(initializer) - - -def bytes_from_buffer(buffer, maxlen=None): - return buffer.raw - - -def null(): - return None - - -class LibraryNotFoundError(Exception): - - """ - An exception when trying to find a shared library - """ - - pass - - -class FFIEngineError(Exception): - - """ - An exception when trying to instantiate ctypes or cffi - """ - - pass diff --git a/asn1crypto/_int.py b/asn1crypto/_int.py index d0c2319..094fc95 100644 --- a/asn1crypto/_int.py +++ b/asn1crypto/_int.py @@ -1,143 +1,6 @@ # coding: utf-8 - -""" -Function for calculating the modular inverse. Exports the following items: - - - inverse_mod() - -Source code is derived from -http://webpages.charter.net/curryfans/peter/downloads.html, but has been heavily -modified to fit into this projects lint settings. The original project license -is listed below: - -Copyright (c) 2014 Peter Pearson - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -""" - from __future__ import unicode_literals, division, absolute_import, print_function -import math -import platform - -from .util import int_to_bytes, int_from_bytes - -# First try to use ctypes with OpenSSL for better performance -try: - from ._ffi import ( - buffer_from_bytes, - bytes_from_buffer, - FFIEngineError, - LibraryNotFoundError, - null, - ) - - # Some versions of PyPy have segfault issues, so we just punt on PyPy - if platform.python_implementation() == 'PyPy': - raise EnvironmentError() - - try: - from ._perf._big_num_ctypes import libcrypto - - def inverse_mod(a, p): - """ - Compute the modular inverse of a (mod p) - - :param a: - An integer - - :param p: - An integer - - :return: - An integer - """ - - ctx = libcrypto.BN_CTX_new() - - a_bytes = int_to_bytes(abs(a)) - p_bytes = int_to_bytes(abs(p)) - - a_buf = buffer_from_bytes(a_bytes) - a_bn = libcrypto.BN_bin2bn(a_buf, len(a_bytes), null()) - if a < 0: - libcrypto.BN_set_negative(a_bn, 1) - - p_buf = buffer_from_bytes(p_bytes) - p_bn = libcrypto.BN_bin2bn(p_buf, len(p_bytes), null()) - if p < 0: - libcrypto.BN_set_negative(p_bn, 1) - - r_bn = libcrypto.BN_mod_inverse(null(), a_bn, p_bn, ctx) - r_len_bits = libcrypto.BN_num_bits(r_bn) - r_len = int(math.ceil(r_len_bits / 8)) - r_buf = buffer_from_bytes(r_len) - libcrypto.BN_bn2bin(r_bn, r_buf) - r_bytes = bytes_from_buffer(r_buf, r_len) - result = int_from_bytes(r_bytes) - - libcrypto.BN_free(a_bn) - libcrypto.BN_free(p_bn) - libcrypto.BN_free(r_bn) - libcrypto.BN_CTX_free(ctx) - - return result - except (LibraryNotFoundError, FFIEngineError): - raise EnvironmentError() - -# If there was an issue using ctypes or OpenSSL, we fall back to pure python -except (EnvironmentError, ImportError): - - def inverse_mod(a, p): - """ - Compute the modular inverse of a (mod p) - - :param a: - An integer - - :param p: - An integer - - :return: - An integer - """ - - if a < 0 or p <= a: - a = a % p - - # From Ferguson and Schneier, roughly: - - c, d = a, p - uc, vc, ud, vd = 1, 0, 0, 1 - while c != 0: - q, c, d = divmod(d, c) + (c,) - uc, vc, ud, vd = ud - q * uc, vd - q * vc, uc, vc - - # At this point, d is the GCD, and ud*a+vd*p = d. - # If d == 1, this means that ud is a inverse. - - assert d == 1 - if ud > 0: - return ud - else: - return ud + p - def fill_width(bytes_, width): """ diff --git a/asn1crypto/_iri.py b/asn1crypto/_iri.py index 57ddd40..7394b4d 100644 --- a/asn1crypto/_iri.py +++ b/asn1crypto/_iri.py @@ -34,13 +34,16 @@ else: ) -def iri_to_uri(value): +def iri_to_uri(value, normalize=False): """ - Normalizes and encodes a unicode IRI into an ASCII byte string URI + Encodes a unicode IRI into an ASCII byte string URI :param value: A unicode string of an IRI + :param normalize: + A bool that controls URI normalization + :return: A byte string of the ASCII-encoded URI """ @@ -91,7 +94,7 @@ def iri_to_uri(value): if port is not None: default_http = scheme == b'http' and port == b'80' default_https = scheme == b'https' and port == b'443' - if not default_http and not default_https: + if not normalize or (not default_http and not default_https): netloc += b':' + port # RFC 3986 allows a path to contain sub-delims, plus "@" and ":" @@ -101,7 +104,7 @@ def iri_to_uri(value): # RFC 3986 allows the fragment to contain sub-delims, plus "@", ":" , "/" and "?" fragment = _urlquote(parsed.fragment, safe='/?!$&\'()*+,;=@:') - if query is None and fragment is None and path == b'/': + if normalize and query is None and fragment is None and path == b'/': path = None # Python 2.7 compat diff --git a/asn1crypto/_perf/__init__.py b/asn1crypto/_perf/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/asn1crypto/_perf/__init__.py +++ /dev/null diff --git a/asn1crypto/_perf/_big_num_ctypes.py b/asn1crypto/_perf/_big_num_ctypes.py deleted file mode 100644 index 8e37e9b..0000000 --- a/asn1crypto/_perf/_big_num_ctypes.py +++ /dev/null @@ -1,69 +0,0 @@ -# coding: utf-8 - -""" -ctypes interface for BN_mod_inverse() function from OpenSSL. Exports the -following items: - - - libcrypto - - BN_bn2bin() - - BN_CTX_free() - - BN_CTX_new() - - BN_free() - - BN_mod_inverse() - - BN_new() - - BN_num_bits() - - BN_set_negative() - -Will raise asn1crypto._ffi.LibraryNotFoundError() if libcrypto can not be -found. Will raise asn1crypto._ffi.FFIEngineError() if there is an error -interfacing with libcrypto. -""" - -from __future__ import unicode_literals, division, absolute_import, print_function - -import sys - -from ctypes import CDLL, c_int, c_char_p, c_void_p -from ctypes.util import find_library - -from .._ffi import LibraryNotFoundError, FFIEngineError - - -try: - # On Python 2, the unicode string here may raise a UnicodeDecodeError as it - # tries to join a bytestring path to the unicode name "crypto" - libcrypto_path = find_library(b'crypto' if sys.version_info < (3,) else 'crypto') - if not libcrypto_path: - raise LibraryNotFoundError('The library libcrypto could not be found') - - libcrypto = CDLL(libcrypto_path) - - libcrypto.BN_new.argtypes = [] - libcrypto.BN_new.restype = c_void_p - - libcrypto.BN_bin2bn.argtypes = [c_char_p, c_int, c_void_p] - libcrypto.BN_bin2bn.restype = c_void_p - - libcrypto.BN_bn2bin.argtypes = [c_void_p, c_char_p] - libcrypto.BN_bn2bin.restype = c_int - - libcrypto.BN_set_negative.argtypes = [c_void_p, c_int] - libcrypto.BN_set_negative.restype = None - - libcrypto.BN_num_bits.argtypes = [c_void_p] - libcrypto.BN_num_bits.restype = c_int - - libcrypto.BN_free.argtypes = [c_void_p] - libcrypto.BN_free.restype = None - - libcrypto.BN_CTX_new.argtypes = [] - libcrypto.BN_CTX_new.restype = c_void_p - - libcrypto.BN_CTX_free.argtypes = [c_void_p] - libcrypto.BN_CTX_free.restype = None - - libcrypto.BN_mod_inverse.argtypes = [c_void_p, c_void_p, c_void_p, c_void_p] - libcrypto.BN_mod_inverse.restype = c_void_p - -except (AttributeError): - raise FFIEngineError('Error initializing ctypes') diff --git a/asn1crypto/algos.py b/asn1crypto/algos.py index c805433..d49be26 100644 --- a/asn1crypto/algos.py +++ b/asn1crypto/algos.py @@ -114,6 +114,10 @@ class HmacAlgorithmId(ObjectIdentifier): '1.2.840.113549.2.11': 'sha512', '1.2.840.113549.2.12': 'sha512_224', '1.2.840.113549.2.13': 'sha512_256', + '2.16.840.1.101.3.4.2.13': 'sha3_224', + '2.16.840.1.101.3.4.2.14': 'sha3_256', + '2.16.840.1.101.3.4.2.15': 'sha3_384', + '2.16.840.1.101.3.4.2.16': 'sha3_512', } @@ -135,6 +139,14 @@ class DigestAlgorithmId(ObjectIdentifier): '2.16.840.1.101.3.4.2.3': 'sha512', '2.16.840.1.101.3.4.2.5': 'sha512_224', '2.16.840.1.101.3.4.2.6': 'sha512_256', + '2.16.840.1.101.3.4.2.7': 'sha3_224', + '2.16.840.1.101.3.4.2.8': 'sha3_256', + '2.16.840.1.101.3.4.2.9': 'sha3_384', + '2.16.840.1.101.3.4.2.10': 'sha3_512', + '2.16.840.1.101.3.4.2.11': 'shake128', + '2.16.840.1.101.3.4.2.12': 'shake256', + '2.16.840.1.101.3.4.2.17': 'shake128_len', + '2.16.840.1.101.3.4.2.18': 'shake256_len', } @@ -240,6 +252,10 @@ class SignedDigestAlgorithmId(ObjectIdentifier): '1.2.840.10045.4.3.2': 'sha256_ecdsa', '1.2.840.10045.4.3.3': 'sha384_ecdsa', '1.2.840.10045.4.3.4': 'sha512_ecdsa', + '2.16.840.1.101.3.4.3.9': 'sha3_224_ecdsa', + '2.16.840.1.101.3.4.3.10': 'sha3_256_ecdsa', + '2.16.840.1.101.3.4.3.11': 'sha3_384_ecdsa', + '2.16.840.1.101.3.4.3.12': 'sha3_512_ecdsa', # For when the digest is specified elsewhere in a Sequence '1.2.840.113549.1.1.1': 'rsassa_pkcs1v15', '1.2.840.10040.4.1': 'dsa', @@ -266,6 +282,10 @@ class SignedDigestAlgorithmId(ObjectIdentifier): 'sha384_rsa': '1.2.840.113549.1.1.12', 'sha512_ecdsa': '1.2.840.10045.4.3.4', 'sha512_rsa': '1.2.840.113549.1.1.13', + 'sha3_224_ecdsa': '2.16.840.1.101.3.4.3.9', + 'sha3_256_ecdsa': '2.16.840.1.101.3.4.3.10', + 'sha3_384_ecdsa': '2.16.840.1.101.3.4.3.11', + 'sha3_512_ecdsa': '2.16.840.1.101.3.4.3.12', } @@ -309,6 +329,10 @@ class SignedDigestAlgorithm(_ForceNullParameters, Sequence): 'sha256_ecdsa': 'ecdsa', 'sha384_ecdsa': 'ecdsa', 'sha512_ecdsa': 'ecdsa', + 'sha3_224_ecdsa': 'ecdsa', + 'sha3_256_ecdsa': 'ecdsa', + 'sha3_384_ecdsa': 'ecdsa', + 'sha3_512_ecdsa': 'ecdsa', 'ecdsa': 'ecdsa', } if algorithm in algo_map: @@ -454,6 +478,15 @@ class Pbes1Params(Sequence): ] +class CcmParams(Sequence): + # https://tools.ietf.org/html/rfc5084 + # aes_ICVlen: 4 | 6 | 8 | 10 | 12 | 14 | 16 + _fields = [ + ('aes_nonce', OctetString), + ('aes_icvlen', Integer), + ] + + class PSourceAlgorithmId(ObjectIdentifier): _map = { '1.2.840.113549.1.1.9': 'p_specified', @@ -563,6 +596,7 @@ class EncryptionAlgorithmId(ObjectIdentifier): '1.3.14.3.2.7': 'des', '1.2.840.113549.3.7': 'tripledes_3key', '1.2.840.113549.3.2': 'rc2', + '1.2.840.113549.3.4': 'rc4', '1.2.840.113549.3.9': 'rc5', # From http://csrc.nist.gov/groups/ST/crypto_apps_infra/csor/algorithms.html#AES '2.16.840.1.101.3.4.1.1': 'aes128_ecb', @@ -628,6 +662,10 @@ class EncryptionAlgorithm(_ForceNullParameters, Sequence): 'aes128_ofb': OctetString, 'aes192_ofb': OctetString, 'aes256_ofb': OctetString, + # From RFC5084 + 'aes128_ccm': CcmParams, + 'aes192_ccm': CcmParams, + 'aes256_ccm': CcmParams, # From PKCS#5 'pbes1_md2_des': Pbes1Params, 'pbes1_md5_des': Pbes1Params, diff --git a/asn1crypto/cms.py b/asn1crypto/cms.py index 9cad949..1fabc13 100644 --- a/asn1crypto/cms.py +++ b/asn1crypto/cms.py @@ -32,6 +32,7 @@ from .algos import ( EncryptionAlgorithm, HmacAlgorithm, KdfAlgorithm, + RSAESOAEPParams, SignedDigestAlgorithm, ) from .core import ( @@ -103,6 +104,14 @@ class CMSAttributeType(ObjectIdentifier): '1.2.840.113549.1.9.16.2.14': 'signature_time_stamp_token', # https://tools.ietf.org/html/rfc6211#page-5 '1.2.840.113549.1.9.52': 'cms_algorithm_protection', + # https://docs.microsoft.com/en-us/previous-versions/hh968145(v%3Dvs.85) + '1.3.6.1.4.1.311.2.4.1': 'microsoft_nested_signature', + # Some places refer to this as SPC_RFC3161_OBJID, others szOID_RFC3161_counterSign. + # https://docs.microsoft.com/en-us/windows/win32/api/wincrypt/ns-wincrypt-crypt_algorithm_identifier + # refers to szOID_RFC3161_counterSign as "1.2.840.113549.1.9.16.1.4", + # but that OID is also called szOID_TIMESTAMP_TOKEN. Because of there being + # no canonical source for this OID, we give it our own name + '1.3.6.1.4.1.311.3.3.1': 'microsoft_time_stamp_token', } @@ -649,7 +658,8 @@ class RecipientIdentifier(Choice): class KeyEncryptionAlgorithmId(ObjectIdentifier): _map = { - '1.2.840.113549.1.1.1': 'rsa', + '1.2.840.113549.1.1.1': 'rsaes_pkcs1v15', + '1.2.840.113549.1.1.7': 'rsaes_oaep', '2.16.840.1.101.3.4.1.5': 'aes128_wrap', '2.16.840.1.101.3.4.1.8': 'aes128_wrap_pad', '2.16.840.1.101.3.4.1.25': 'aes192_wrap', @@ -658,6 +668,18 @@ class KeyEncryptionAlgorithmId(ObjectIdentifier): '2.16.840.1.101.3.4.1.48': 'aes256_wrap_pad', } + _reverse_map = { + 'rsa': '1.2.840.113549.1.1.1', + 'rsaes_pkcs1v15': '1.2.840.113549.1.1.1', + 'rsaes_oaep': '1.2.840.113549.1.1.7', + 'aes128_wrap': '2.16.840.1.101.3.4.1.5', + 'aes128_wrap_pad': '2.16.840.1.101.3.4.1.8', + 'aes192_wrap': '2.16.840.1.101.3.4.1.25', + 'aes192_wrap_pad': '2.16.840.1.101.3.4.1.28', + 'aes256_wrap': '2.16.840.1.101.3.4.1.45', + 'aes256_wrap_pad': '2.16.840.1.101.3.4.1.48', + } + class KeyEncryptionAlgorithm(_ForceNullParameters, Sequence): _fields = [ @@ -665,6 +687,11 @@ class KeyEncryptionAlgorithm(_ForceNullParameters, Sequence): ('parameters', Any, {'optional': True}), ] + _oid_pair = ('algorithm', 'parameters') + _oid_specs = { + 'rsaes_oaep': RSAESOAEPParams, + } + class KeyTransRecipientInfo(Sequence): _fields = [ @@ -929,4 +956,6 @@ CMSAttribute._oid_specs = { 'counter_signature': SignerInfos, 'signature_time_stamp_token': SetOfContentInfo, 'cms_algorithm_protection': SetOfCMSAlgorithmProtection, + 'microsoft_nested_signature': SetOfContentInfo, + 'microsoft_time_stamp_token': SetOfContentInfo, } diff --git a/asn1crypto/core.py b/asn1crypto/core.py index 14a8203..933f8ca 100644 --- a/asn1crypto/core.py +++ b/asn1crypto/core.py @@ -49,6 +49,7 @@ Other type classes are defined that help compose the types listed above. from __future__ import unicode_literals, division, absolute_import, print_function from datetime import datetime, timedelta +from fractions import Fraction import binascii import copy import math @@ -60,7 +61,7 @@ from ._errors import unwrap from ._ordereddict import OrderedDict from ._types import type_name, str_cls, byte_cls, int_types, chr_cls from .parser import _parse, _dump_header -from .util import int_to_bytes, int_from_bytes, timezone, extended_datetime +from .util import int_to_bytes, int_from_bytes, timezone, extended_datetime, create_timezone, utc_with_dst if sys.version_info <= (3,): from cStringIO import StringIO as BytesIO @@ -230,7 +231,7 @@ class Asn1Value(object): return value def __init__(self, explicit=None, implicit=None, no_explicit=False, tag_type=None, class_=None, tag=None, - optional=None, default=None, contents=None): + optional=None, default=None, contents=None, method=None): """ The optional parameter is not used, but rather included so we don't have to delete it from the parameter dictionary when passing as keyword @@ -275,6 +276,12 @@ class Asn1Value(object): :param contents: A byte string of the encoded contents of the value + :param method: + The method for the value - no default value since this is + normally set on a class. Valid values include: + - "primitive" or 0 + - "constructed" or 1 + :raises: ValueError - when implicit, explicit, tag_type, class_ or tag are invalid values """ @@ -384,7 +391,7 @@ class Asn1Value(object): self.implicit = True else: if class_ is not None: - if class_ not in CLASS_NUM_TO_NAME_MAP: + if class_ not in CLASS_NAME_TO_NUM_MAP: raise ValueError(unwrap( ''' class_ must be one of "universal", "application", @@ -394,9 +401,27 @@ class Asn1Value(object): )) self.class_ = CLASS_NAME_TO_NUM_MAP[class_] + if self.class_ is None: + self.class_ = 0 + if tag is not None: self.tag = tag + if method is not None: + if method not in set(["primitive", 0, "constructed", 1]): + raise ValueError(unwrap( + ''' + method must be one of "primitive" or "constructed", + not %s + ''', + repr(method) + )) + if method == "primitive": + method = 0 + elif method == "constructed": + method = 1 + self.method = method + if no_explicit: self.explicit = None @@ -603,6 +628,10 @@ class Asn1Value(object): contents = self.contents + # If the length is indefinite, force the re-encoding + if self._header is not None and self._header[-1:] == b'\x80': + force = True + if self._header is None or force: if isinstance(self, Constructable) and self._indefinite: self.method = 0 @@ -616,7 +645,7 @@ class Asn1Value(object): self._header = header self._trailer = b'' - return self._header + contents + return self._header + contents + self._trailer class ValueMap(): @@ -700,10 +729,6 @@ class Constructable(object): # length when parsed - affects parsing and dumping _indefinite = False - # Class attribute that indicates the offset into self.contents - # that contains the chunks of data to merge - _chunks_offset = 0 - def _merge_chunks(self): """ :return: @@ -713,7 +738,7 @@ class Constructable(object): if not self._indefinite: return self._as_chunk() - pointer = self._chunks_offset + pointer = 0 contents_len = len(self.contents) output = None @@ -740,9 +765,21 @@ class Constructable(object): byte strings, unicode strings or tuples. """ - if self._chunks_offset == 0: - return self.contents - return self.contents[self._chunks_offset:] + return self.contents + + def _setable_native(self): + """ + Returns a native value that can be round-tripped into .set(), to + result in a DER encoding. This differs from .native in that .native + is designed for the end use, and may account for the fact that the + merged value is further parsed as ASN.1, such as in the case of + ParsableOctetString() and ParsableOctetBitString(). + + :return: + A python value that is valid to pass to .set() + """ + + return self.native def _copy(self, other, copy_func): """ @@ -757,8 +794,10 @@ class Constructable(object): """ super(Constructable, self)._copy(other, copy_func) - self.method = other.method - self._indefinite = other._indefinite + # We really don't want to dump BER encodings, so if we see an + # indefinite encoding, let's re-encode it + if other._indefinite: + self.set(other._setable_native()) class Void(Asn1Value): @@ -792,7 +831,7 @@ class Void(Asn1Value): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: None @@ -860,7 +899,7 @@ class Any(Asn1Value): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: The .native value from the parsed value object @@ -982,6 +1021,10 @@ class Choice(Asn1Value): # The Asn1Value object for the chosen alternative _parsed = None + # Choice overrides .contents to be a property so that the code expecting + # the .contents attribute will get the .contents of the chosen alternative + _contents = None + # A list of tuples in one of the following forms. # # Option 1, a unicode string field name and a value class @@ -1045,8 +1088,8 @@ class Choice(Asn1Value): :param name: The name of the alternative to be set - used with value. Alternatively this may be a dict with a single key being the name - and the value being the value, or a two-element tuple of the the - name and the value. + and the value being the value, or a two-element tuple of the name + and the value. :param value: The alternative value to set - used with name @@ -1122,6 +1165,27 @@ class Choice(Asn1Value): raise e @property + def contents(self): + """ + :return: + A byte string of the DER-encoded contents of the chosen alternative + """ + + if self._parsed is not None: + return self._parsed.contents + + return self._contents + + @contents.setter + def contents(self, value): + """ + :param value: + A byte string of the DER-encoded contents of the chosen alternative + """ + + self._contents = value + + @property def name(self): """ :return: @@ -1139,16 +1203,15 @@ class Choice(Asn1Value): An Asn1Value object of the chosen alternative """ - if self._parsed is not None: - return self._parsed - - try: - _, spec, params = self._alternatives[self._choice] - self._parsed, _ = _parse_build(self.contents, spec=spec, spec_params=params) - except (ValueError, TypeError) as e: - args = e.args[1:] - e.args = (e.args[0] + '\n while parsing %s' % type_name(self),) + args - raise e + if self._parsed is None: + try: + _, spec, params = self._alternatives[self._choice] + self._parsed, _ = _parse_build(self._contents, spec=spec, spec_params=params) + except (ValueError, TypeError) as e: + args = e.args[1:] + e.args = (e.args[0] + '\n while parsing %s' % type_name(self),) + args + raise e + return self._parsed @property def chosen(self): @@ -1162,7 +1225,7 @@ class Choice(Asn1Value): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: The .native value from the contained value object @@ -1271,13 +1334,17 @@ class Choice(Asn1Value): A byte string of the DER-encoded value """ - self.contents = self.chosen.dump(force=force) + # If the length is indefinite, force the re-encoding + if self._header is not None and self._header[-1:] == b'\x80': + force = True + + self._contents = self.chosen.dump(force=force) if self._header is None or force: self._header = b'' if self.explicit is not None: for class_, tag in self.explicit: - self._header = _dump_header(class_, 1, tag, self._header + self.contents) + self._header - return self._header + self.contents + self._header = _dump_header(class_, 1, tag, self._header + self._contents) + self._header + return self._header + self._contents class Concat(object): @@ -1644,6 +1711,10 @@ class Primitive(Asn1Value): A byte string of the DER-encoded value """ + # If the length is indefinite, force the re-encoding + if self._header is not None and self._header[-1:] == b'\x80': + force = True + if force: native = self.native self.contents = None @@ -1761,7 +1832,7 @@ class AbstractString(Constructable, Primitive): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: A unicode string or None @@ -1812,7 +1883,7 @@ class Boolean(Primitive): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: True, False or None @@ -1891,7 +1962,7 @@ class Integer(Primitive, ValueMap): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: An integer or None @@ -1907,7 +1978,115 @@ class Integer(Primitive, ValueMap): return self._native -class BitString(Constructable, Castable, Primitive, ValueMap, object): +class _IntegerBitString(object): + """ + A mixin for IntegerBitString and BitString to parse the contents as an integer. + """ + + # Tuple of 1s and 0s; set through native + _unused_bits = () + + def _as_chunk(self): + """ + Parse the contents of a primitive BitString encoding as an integer value. + Allows reconstructing indefinite length values. + + :raises: + ValueError - when an invalid value is passed + + :return: + A list with one tuple (value, bits, unused_bits) where value is an integer + with the value of the BitString, bits is the bit count of value and + unused_bits is a tuple of 1s and 0s. + """ + + if self._indefinite: + # return an empty chunk, for cases like \x23\x80\x00\x00 + return [] + + unused_bits_len = ord(self.contents[0]) if _PY2 else self.contents[0] + value = int_from_bytes(self.contents[1:]) + bits = (len(self.contents) - 1) * 8 + + if not unused_bits_len: + return [(value, bits, ())] + + if len(self.contents) == 1: + # Disallowed by X.690 §8.6.2.3 + raise ValueError('Empty bit string has {0} unused bits'.format(unused_bits_len)) + + if unused_bits_len > 7: + # Disallowed by X.690 §8.6.2.2 + raise ValueError('Bit string has {0} unused bits'.format(unused_bits_len)) + + unused_bits = _int_to_bit_tuple(value & ((1 << unused_bits_len) - 1), unused_bits_len) + value >>= unused_bits_len + bits -= unused_bits_len + + return [(value, bits, unused_bits)] + + def _chunks_to_int(self): + """ + Combines the chunks into a single value. + + :raises: + ValueError - when an invalid value is passed + + :return: + A tuple (value, bits, unused_bits) where value is an integer with the + value of the BitString, bits is the bit count of value and unused_bits + is a tuple of 1s and 0s. + """ + + if not self._indefinite: + # Fast path + return self._as_chunk()[0] + + value = 0 + total_bits = 0 + unused_bits = () + + # X.690 §8.6.3 allows empty indefinite encodings + for chunk, bits, unused_bits in self._merge_chunks(): + if total_bits & 7: + # Disallowed by X.690 §8.6.4 + raise ValueError('Only last chunk in a bit string may have unused bits') + total_bits += bits + value = (value << bits) | chunk + + return value, total_bits, unused_bits + + def _copy(self, other, copy_func): + """ + Copies the contents of another _IntegerBitString object to itself + + :param object: + Another instance of the same class + + :param copy_func: + An reference of copy.copy() or copy.deepcopy() to use when copying + lists, dicts and objects + """ + + super(_IntegerBitString, self)._copy(other, copy_func) + self._unused_bits = other._unused_bits + + @property + def unused_bits(self): + """ + The unused bits of the bit string encoding. + + :return: + A tuple of 1s and 0s + """ + + # call native to set _unused_bits + self.native + + return self._unused_bits + + +class BitString(_IntegerBitString, Constructable, Castable, Primitive, ValueMap): """ Represents a bit string from ASN.1 as a Python tuple of 1s and 0s """ @@ -1916,10 +2095,6 @@ class BitString(Constructable, Castable, Primitive, ValueMap, object): _size = None - # Used with _as_chunk() from Constructable - _chunk = None - _chunks_offset = 1 - def _setup(self): """ Generates _reverse_map from _map @@ -1983,8 +2158,6 @@ class BitString(Constructable, Castable, Primitive, ValueMap, object): type_name(value) )) - self._chunk = None - if self._map is not None: if len(value) > self._size: raise ValueError(unwrap( @@ -2024,6 +2197,7 @@ class BitString(Constructable, Castable, Primitive, ValueMap, object): value_bytes = (b'\x00' * (size_in_bytes - len(value_bytes))) + value_bytes self.contents = extra_bits_byte + value_bytes + self._unused_bits = (0,) * extra_bits self._header = None if self._indefinite: self._indefinite = False @@ -2135,40 +2309,10 @@ class BitString(Constructable, Castable, Primitive, ValueMap, object): self.set(self._native) - def _as_chunk(self): - """ - Allows reconstructing indefinite length values - - :return: - A tuple of integers - """ - - extra_bits = int_from_bytes(self.contents[0:1]) - bit_string = '{0:b}'.format(int_from_bytes(self.contents[1:])) - byte_len = len(self.contents[1:]) - bit_len = len(bit_string) - - # Left-pad the bit string to a byte multiple to ensure we didn't - # lose any zero bits on the left - mod_bit_len = bit_len % 8 - if mod_bit_len != 0: - bit_string = ('0' * (8 - mod_bit_len)) + bit_string - bit_len = len(bit_string) - - if bit_len // 8 < byte_len: - missing_bytes = byte_len - (bit_len // 8) - bit_string = ('0' * (8 * missing_bytes)) + bit_string - - # Trim off the extra bits on the right used to fill the last byte - if extra_bits > 0: - bit_string = bit_string[0:0 - extra_bits] - - return tuple(map(int, tuple(bit_string))) - @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: If a _map is set, a set of names, or if no _map is set, a tuple of @@ -2183,7 +2327,9 @@ class BitString(Constructable, Castable, Primitive, ValueMap, object): self.set(set()) if self._native is None: - bits = self._merge_chunks() + int_value, bit_count, self._unused_bits = self._chunks_to_int() + bits = _int_to_bit_tuple(int_value, bit_count) + if self._map: self._native = set() for index, bit in enumerate(bits): @@ -2202,15 +2348,12 @@ class OctetBitString(Constructable, Castable, Primitive): tag = 3 - # Whenever dealing with octet-based bit strings, we really want the - # bytes, so we just ignore the unused bits portion since it isn't - # applicable to the current use case - # unused_bits = struct.unpack('>B', self.contents[0:1])[0] - _chunks_offset = 1 - # Instance attribute of (possibly-merged) byte string _bytes = None + # Tuple of 1s and 0s; set through native + _unused_bits = () + def set(self, value): """ Sets the value of the object @@ -2234,6 +2377,7 @@ class OctetBitString(Constructable, Castable, Primitive): self._bytes = value # Set the unused bits to 0 self.contents = b'\x00' + value + self._unused_bits = () self._header = None if self._indefinite: self._indefinite = False @@ -2250,7 +2394,18 @@ class OctetBitString(Constructable, Castable, Primitive): if self.contents is None: return b'' if self._bytes is None: - self._bytes = self._merge_chunks() + if not self._indefinite: + self._bytes, self._unused_bits = self._as_chunk()[0] + else: + chunks = self._merge_chunks() + self._unused_bits = () + for chunk in chunks: + if self._unused_bits: + # Disallowed by X.690 §8.6.4 + raise ValueError('Only last chunk in a bit string may have unused bits') + self._unused_bits = chunk[1] + self._bytes = b''.join(chunk[0] for chunk in chunks) + return self._bytes def _copy(self, other, copy_func): @@ -2267,11 +2422,46 @@ class OctetBitString(Constructable, Castable, Primitive): super(OctetBitString, self)._copy(other, copy_func) self._bytes = other._bytes + self._unused_bits = other._unused_bits + + def _as_chunk(self): + """ + Allows reconstructing indefinite length values + + :raises: + ValueError - when an invalid value is passed + + :return: + List with one tuple, consisting of a byte string and an integer (unused bits) + """ + + unused_bits_len = ord(self.contents[0]) if _PY2 else self.contents[0] + if not unused_bits_len: + return [(self.contents[1:], ())] + + if len(self.contents) == 1: + # Disallowed by X.690 §8.6.2.3 + raise ValueError('Empty bit string has {0} unused bits'.format(unused_bits_len)) + + if unused_bits_len > 7: + # Disallowed by X.690 §8.6.2.2 + raise ValueError('Bit string has {0} unused bits'.format(unused_bits_len)) + + mask = (1 << unused_bits_len) - 1 + last_byte = ord(self.contents[-1]) if _PY2 else self.contents[-1] + + # zero out the unused bits in the last byte. + zeroed_byte = last_byte & ~mask + value = self.contents[1:-1] + (chr(zeroed_byte) if _PY2 else bytes((zeroed_byte,))) + + unused_bits = _int_to_bit_tuple(last_byte & mask, unused_bits_len) + + return [(value, unused_bits)] @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: A byte string or None @@ -2282,16 +2472,28 @@ class OctetBitString(Constructable, Castable, Primitive): return self.__bytes__() + @property + def unused_bits(self): + """ + The unused bits of the bit string encoding. + + :return: + A tuple of 1s and 0s + """ + + # call native to set _unused_bits + self.native + + return self._unused_bits + -class IntegerBitString(Constructable, Castable, Primitive): +class IntegerBitString(_IntegerBitString, Constructable, Castable, Primitive): """ Represents a bit string in ASN.1 as a Python integer """ tag = 3 - _chunks_offset = 1 - def set(self, value): """ Sets the value of the object @@ -2306,15 +2508,25 @@ class IntegerBitString(Constructable, Castable, Primitive): if not isinstance(value, int_types): raise TypeError(unwrap( ''' - %s value must be an integer, not %s + %s value must be a positive integer, not %s ''', type_name(self), type_name(value) )) + if value < 0: + raise ValueError(unwrap( + ''' + %s value must be a positive integer, not %d + ''', + type_name(self), + value + )) + self._native = value # Set the unused bits to 0 self.contents = b'\x00' + int_to_bytes(value, signed=True) + self._unused_bits = () self._header = None if self._indefinite: self._indefinite = False @@ -2322,31 +2534,10 @@ class IntegerBitString(Constructable, Castable, Primitive): if self._trailer != b'': self._trailer = b'' - def _as_chunk(self): - """ - Allows reconstructing indefinite length values - - :return: - A unicode string of bits - 1s and 0s - """ - - extra_bits = int_from_bytes(self.contents[0:1]) - bit_string = '{0:b}'.format(int_from_bytes(self.contents[1:])) - - # Ensure we have leading zeros since these chunks may be concatenated together - mod_bit_len = len(bit_string) % 8 - if mod_bit_len != 0: - bit_string = ('0' * (8 - mod_bit_len)) + bit_string - - if extra_bits > 0: - return bit_string[0:0 - extra_bits] - - return bit_string - @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: An integer or None @@ -2356,14 +2547,8 @@ class IntegerBitString(Constructable, Castable, Primitive): return None if self._native is None: - extra_bits = int_from_bytes(self.contents[0:1]) - # Fast path - if not self._indefinite and extra_bits == 0: - self._native = int_from_bytes(self.contents[1:]) - else: - if self._indefinite and extra_bits > 0: - raise ValueError('Constructed bit string has extra bits on indefinite container') - self._native = int(self._merge_chunks(), 2) + self._native, __, self._unused_bits = self._chunks_to_int() + return self._native @@ -2433,7 +2618,7 @@ class OctetString(Constructable, Castable, Primitive): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: A byte string or None @@ -2452,6 +2637,12 @@ class IntegerOctetString(Constructable, Castable, Primitive): tag = 4 + # An explicit length in bytes the integer should be encoded to. This should + # generally not be used since DER defines a canonical encoding, however some + # use of this, such as when storing elliptic curve private keys, requires an + # exact number of bytes, even if the leading bytes are null. + _encoded_width = None + def set(self, value): """ Sets the value of the object @@ -2466,14 +2657,23 @@ class IntegerOctetString(Constructable, Castable, Primitive): if not isinstance(value, int_types): raise TypeError(unwrap( ''' - %s value must be an integer, not %s + %s value must be a positive integer, not %s ''', type_name(self), type_name(value) )) + if value < 0: + raise ValueError(unwrap( + ''' + %s value must be a positive integer, not %d + ''', + type_name(self), + value + )) + self._native = value - self.contents = int_to_bytes(value, signed=False) + self.contents = int_to_bytes(value, signed=False, width=self._encoded_width) self._header = None if self._indefinite: self._indefinite = False @@ -2484,7 +2684,7 @@ class IntegerOctetString(Constructable, Castable, Primitive): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: An integer or None @@ -2497,6 +2697,19 @@ class IntegerOctetString(Constructable, Castable, Primitive): self._native = int_from_bytes(self._merge_chunks()) return self._native + def set_encoded_width(self, width): + """ + Set the explicit enoding width for the integer + + :param width: + An integer byte width to encode the integer to + """ + + self._encoded_width = width + # Make sure the encoded value is up-to-date with the proper width + if self.contents is not None and len(self.contents) != width: + self.set(self.native) + class ParsableOctetString(Constructable, Castable, Primitive): @@ -2592,6 +2805,16 @@ class ParsableOctetString(Constructable, Castable, Primitive): self._bytes = self._merge_chunks() return self._bytes + def _setable_native(self): + """ + Returns a byte string that can be passed into .set() + + :return: + A python value that is valid to pass to .set() + """ + + return self.__bytes__() + def _copy(self, other, copy_func): """ Copies the contents of another ParsableOctetString object to itself @@ -2611,7 +2834,7 @@ class ParsableOctetString(Constructable, Castable, Primitive): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: A byte string or None @@ -2651,6 +2874,10 @@ class ParsableOctetString(Constructable, Castable, Primitive): A byte string of the DER-encoded value """ + # If the length is indefinite, force the re-encoding + if self._indefinite: + force = True + if force: if self._parsed is not None: native = self.parsed.dump(force=force) @@ -2666,12 +2893,6 @@ class ParsableOctetBitString(ParsableOctetString): tag = 3 - # Whenever dealing with octet-based bit strings, we really want the - # bytes, so we just ignore the unused bits portion since it isn't - # applicable to the current use case - # unused_bits = struct.unpack('>B', self.contents[0:1])[0] - _chunks_offset = 1 - def set(self, value): """ Sets the value of the object @@ -2702,6 +2923,23 @@ class ParsableOctetBitString(ParsableOctetString): if self._trailer != b'': self._trailer = b'' + def _as_chunk(self): + """ + Allows reconstructing indefinite length values + + :raises: + ValueError - when an invalid value is passed + + :return: + A byte string + """ + + unused_bits_len = ord(self.contents[0]) if _PY2 else self.contents[0] + if unused_bits_len: + raise ValueError('ParsableOctetBitString should have no unused bits') + + return self.contents[1:] + class Null(Primitive): """ @@ -2725,7 +2963,7 @@ class Null(Primitive): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: None @@ -2919,7 +3157,7 @@ class ObjectIdentifier(Primitive, ValueMap): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: A unicode string or None. If _map is not defined, the unicode string @@ -2932,8 +3170,8 @@ class ObjectIdentifier(Primitive, ValueMap): if self._native is None: self._native = self.dotted - if self._map is not None and self._native in self._map: - self._native = self._map[self._native] + if self._map is not None and self._native in self._map: + self._native = self._map[self._native] return self._native @@ -3015,7 +3253,7 @@ class Enumerated(Integer): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: A unicode string or None @@ -3312,8 +3550,6 @@ class Sequence(Asn1Value): invalid_value = False if isinstance(new_value, Any): invalid_value = new_value.parsed is None - elif isinstance(new_value, Choice): - invalid_value = new_value.chosen.contents is None else: invalid_value = new_value.contents is None @@ -3527,7 +3763,10 @@ class Sequence(Asn1Value): is_any = issubclass(field_spec, Any) if issubclass(value_spec, Choice): - if not isinstance(value, Asn1Value): + is_asn1value = isinstance(value, Asn1Value) + is_tuple = isinstance(value, tuple) and len(value) == 2 + is_dict = isinstance(value, dict) and len(value) == 1 + if not is_asn1value and not is_tuple and not is_dict: raise ValueError(unwrap( ''' Can not set a native python value to %s, which has the @@ -3536,6 +3775,8 @@ class Sequence(Asn1Value): field_name, type_name(value_spec) )) + if is_tuple or is_dict: + value = value_spec(value) if not isinstance(value, value_spec): wrapper = value_spec() wrapper.validate(value.class_, value.tag, value.contents) @@ -3550,12 +3791,30 @@ class Sequence(Asn1Value): new_value.parse(value_spec) elif (not specs_different or is_any) and not isinstance(value, value_spec): + if (not is_any or specs_different) and isinstance(value, Asn1Value): + raise TypeError(unwrap( + ''' + %s value must be %s, not %s + ''', + field_name, + type_name(value_spec), + type_name(value) + )) new_value = value_spec(value, **field_params) else: if isinstance(value, value_spec): new_value = value else: + if isinstance(value, Asn1Value): + raise TypeError(unwrap( + ''' + %s value must be %s, not %s + ''', + field_name, + type_name(value_spec), + type_name(value) + )) new_value = value_spec(value) # For when the field is OctetString or OctetBitString with embedded @@ -3701,6 +3960,7 @@ class Sequence(Asn1Value): index += 1 except (ValueError, TypeError) as e: + self.children = None args = e.args[1:] e.args = (e.args[0] + '\n while parsing %s' % type_name(self),) + args raise e @@ -3747,7 +4007,7 @@ class Sequence(Asn1Value): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: An OrderedDict or None. If an OrderedDict, all child values are @@ -3772,6 +4032,7 @@ class Sequence(Asn1Value): name = str_cls(index) self._native[name] = child.native except (ValueError, TypeError) as e: + self._native = None args = e.args[1:] e.args = (e.args[0] + '\n while parsing %s' % type_name(self),) + args raise e @@ -3826,6 +4087,10 @@ class Sequence(Asn1Value): A byte string of the DER-encoded value """ + # If the length is indefinite, force the re-encoding + if self._header is not None and self._header[-1:] == b'\x80': + force = True + if force: self._set_contents(force=force) @@ -4204,6 +4469,7 @@ class SequenceOf(Asn1Value): child._parse_children(recurse=True) self.children.append(child) except (ValueError, TypeError) as e: + self.children = None args = e.args[1:] e.args = (e.args[0] + '\n while parsing %s' % type_name(self),) + args raise e @@ -4222,7 +4488,7 @@ class SequenceOf(Asn1Value): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: A list or None. If a list, all child values are recursively @@ -4289,6 +4555,10 @@ class SequenceOf(Asn1Value): A byte string of the DER-encoded value """ + # If the length is indefinite, force the re-encoding + if self._header is not None and self._header[-1:] == b'\x80': + force = True + if force: self._set_contents(force=force) @@ -4572,53 +4842,134 @@ class AbstractTime(AbstractString): """ @property + def _parsed_time(self): + """ + The parsed datetime string. + + :raises: + ValueError - when an invalid value is passed + + :return: + A dict with the parsed values + """ + + string = str_cls(self) + + m = self._TIMESTRING_RE.match(string) + if not m: + raise ValueError(unwrap( + ''' + Error parsing %s to a %s + ''', + string, + type_name(self), + )) + + groups = m.groupdict() + + tz = None + if groups['zulu']: + tz = timezone.utc + elif groups['dsign']: + sign = 1 if groups['dsign'] == '+' else -1 + tz = create_timezone(sign * timedelta( + hours=int(groups['dhour']), + minutes=int(groups['dminute'] or 0) + )) + + if groups['fraction']: + # Compute fraction in microseconds + fract = Fraction( + int(groups['fraction']), + 10 ** len(groups['fraction']) + ) * 1000000 + + if groups['minute'] is None: + fract *= 3600 + elif groups['second'] is None: + fract *= 60 + + fract_usec = int(fract.limit_denominator(1)) + + else: + fract_usec = 0 + + return { + 'year': int(groups['year']), + 'month': int(groups['month']), + 'day': int(groups['day']), + 'hour': int(groups['hour']), + 'minute': int(groups['minute'] or 0), + 'second': int(groups['second'] or 0), + 'tzinfo': tz, + 'fraction': fract_usec, + } + + @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: - A datetime.datetime object in the UTC timezone or None + A datetime.datetime object, asn1crypto.util.extended_datetime object or + None. The datetime object is usually timezone aware. If it's naive, then + it's in the sender's local time; see X.680 sect. 42.3 """ if self.contents is None: return None if self._native is None: - string = str_cls(self) - has_timezone = re.search('[-\\+]', string) + parsed = self._parsed_time - # We don't know what timezone it is in, or it is UTC because of a Z - # suffix, so we just assume UTC - if not has_timezone: - string = string.rstrip('Z') - date = self._date_by_len(string) - self._native = date.replace(tzinfo=timezone.utc) + fraction = parsed.pop('fraction', 0) - else: - # Python 2 doesn't support the %z format code, so we have to manually - # process the timezone offset. - date = self._date_by_len(string[0:-5]) - - hours = int(string[-4:-2]) - minutes = int(string[-2:]) - delta = timedelta(hours=abs(hours), minutes=minutes) - if hours < 0: - date -= delta - else: - date += delta + value = self._get_datetime(parsed) + + if fraction: + value += timedelta(microseconds=fraction) - self._native = date.replace(tzinfo=timezone.utc) + self._native = value return self._native class UTCTime(AbstractTime): """ - Represents a UTC time from ASN.1 as a Python datetime.datetime object in UTC + Represents a UTC time from ASN.1 as a timezone aware Python datetime.datetime object """ tag = 23 + # Regular expression for UTCTime as described in X.680 sect. 43 and ISO 8601 + _TIMESTRING_RE = re.compile(r''' + ^ + # YYMMDD + (?P<year>\d{2}) + (?P<month>\d{2}) + (?P<day>\d{2}) + + # hhmm or hhmmss + (?P<hour>\d{2}) + (?P<minute>\d{2}) + (?P<second>\d{2})? + + # Matches nothing, needed because GeneralizedTime uses this. + (?P<fraction>) + + # Z or [-+]hhmm + (?: + (?P<zulu>Z) + | + (?: + (?P<dsign>[-+]) + (?P<dhour>\d{2}) + (?P<dminute>\d{2}) + ) + ) + $ + ''', re.X) + def set(self, value): """ Sets the value of the object @@ -4631,6 +4982,15 @@ class UTCTime(AbstractTime): """ if isinstance(value, datetime): + if not value.tzinfo: + raise ValueError('Must be timezone aware') + + # Convert value to UTC. + value = value.astimezone(utc_with_dst) + + if not 1950 <= value.year <= 2049: + raise ValueError('Year of the UTCTime is not in range [1950, 2049], use GeneralizedTime instead') + value = value.strftime('%y%m%d%H%M%SZ') if _PY2: value = value.decode('ascii') @@ -4640,32 +5000,24 @@ class UTCTime(AbstractTime): # time that .native is called self._native = None - def _date_by_len(self, string): + def _get_datetime(self, parsed): """ - Parses a date from a string based on its length - - :param string: - A unicode string to parse + Create a datetime object from the parsed time. :return: - A datetime.datetime object or a unicode string + An aware datetime.datetime object """ - strlen = len(string) - - year_num = int(string[0:2]) - if year_num < 50: - prefix = '20' + # X.680 only specifies that UTCTime is not using a century. + # So "18" could as well mean 2118 or 1318. + # X.509 and CMS specify to use UTCTime for years earlier than 2050. + # Assume that UTCTime is only used for years [1950, 2049]. + if parsed['year'] < 50: + parsed['year'] += 2000 else: - prefix = '19' - - if strlen == 10: - return datetime.strptime(prefix + string, '%Y%m%d%H%M') + parsed['year'] += 1900 - if strlen == 12: - return datetime.strptime(prefix + string, '%Y%m%d%H%M%S') - - return string + return datetime(**parsed) class GeneralizedTime(AbstractTime): @@ -4676,6 +5028,44 @@ class GeneralizedTime(AbstractTime): tag = 24 + # Regular expression for GeneralizedTime as described in X.680 sect. 42 and ISO 8601 + _TIMESTRING_RE = re.compile(r''' + ^ + # YYYYMMDD + (?P<year>\d{4}) + (?P<month>\d{2}) + (?P<day>\d{2}) + + # hh or hhmm or hhmmss + (?P<hour>\d{2}) + (?: + (?P<minute>\d{2}) + (?P<second>\d{2})? + )? + + # Optional fraction; [.,]dddd (one or more decimals) + # If Seconds are given, it's fractions of Seconds. + # Else if Minutes are given, it's fractions of Minutes. + # Else it's fractions of Hours. + (?: + [,.] + (?P<fraction>\d+) + )? + + # Optional timezone. If left out, the time is in local time. + # Z or [-+]hh or [-+]hhmm + (?: + (?P<zulu>Z) + | + (?: + (?P<dsign>[-+]) + (?P<dhour>\d{2}) + (?P<dminute>\d{2})? + ) + )? + $ + ''', re.X) + def set(self, value): """ Sets the value of the object @@ -4689,7 +5079,18 @@ class GeneralizedTime(AbstractTime): """ if isinstance(value, (datetime, extended_datetime)): - value = value.strftime('%Y%m%d%H%M%SZ') + if not value.tzinfo: + raise ValueError('Must be timezone aware') + + # Convert value to UTC. + value = value.astimezone(utc_with_dst) + + if value.microsecond: + fraction = '.' + str(value.microsecond).zfill(6).rstrip('0') + else: + fraction = '' + + value = value.strftime('%Y%m%d%H%M%S') + fraction + 'Z' if _PY2: value = value.decode('ascii') @@ -4698,47 +5099,20 @@ class GeneralizedTime(AbstractTime): # time that .native is called self._native = None - def _date_by_len(self, string): + def _get_datetime(self, parsed): """ - Parses a date from a string based on its length - - :param string: - A unicode string to parse + Create a datetime object from the parsed time. :return: - A datetime.datetime object, asn1crypto.util.extended_datetime object or - a unicode string - """ - - strlen = len(string) - - date_format = None - if strlen == 10: - date_format = '%Y%m%d%H' - elif strlen == 12: - date_format = '%Y%m%d%H%M' - elif strlen == 14: - date_format = '%Y%m%d%H%M%S' - elif strlen == 18: - date_format = '%Y%m%d%H%M%S.%f' - - if date_format: - if len(string) >= 4 and string[0:4] == '0000': - # Year 2000 shares a calendar with year 0, and is supported natively - t = datetime.strptime('2000' + string[4:], date_format) - return extended_datetime( - 0, - t.month, - t.day, - t.hour, - t.minute, - t.second, - t.microsecond, - t.tzinfo - ) - return datetime.strptime(string, date_format) + A datetime.datetime object or asn1crypto.util.extended_datetime object. + It may or may not be aware. + """ - return string + if parsed['year'] == 0: + # datetime does not support year 0. Use extended_datetime instead. + return extended_datetime(**parsed) + else: + return datetime(**parsed) class GraphicString(AbstractString): @@ -4839,6 +5213,9 @@ def _basic_debug(prefix, self): elif has_header: print('%s %s %s tag %s' % (prefix, method_name, class_name, self.tag)) + if self._trailer: + print('%s Trailer: 0x%s' % (prefix, binascii.hexlify(self._trailer or b'').decode('utf-8'))) + print('%s Data: 0x%s' % (prefix, binascii.hexlify(self.contents or b'').decode('utf-8'))) @@ -4916,7 +5293,7 @@ def _build_id_tuple(params, spec): A 2-element integer tuple in the form (class_, tag) """ - # Handle situations where the the spec is not known at setup time + # Handle situations where the spec is not known at setup time if spec is None: return (None, None) @@ -4946,6 +5323,30 @@ def _build_id_tuple(params, spec): return (required_class, required_tag) +def _int_to_bit_tuple(value, bits): + """ + Format value as a tuple of 1s and 0s. + + :param value: + A non-negative integer to format + + :param bits: + Number of bits in the output + + :return: + A tuple of 1s and 0s with bits members. + """ + + if not value and not bits: + return () + + result = tuple(map(int, format(value, '0{0}b'.format(bits)))) + if len(result) != bits: + raise ValueError('Result too large: {0} > {1}'.format(len(result), bits)) + + return result + + _UNIVERSAL_SPECS = { 1: Boolean, 2: Integer, @@ -5078,8 +5479,10 @@ def _build(class_, method, tag, header, contents, trailer, spec=None, spec_param )) info, _ = _parse(to_parse, len(to_parse)) parsed_class, parsed_method, parsed_tag, parsed_header, to_parse, parsed_trailer = info - explicit_header += parsed_header - explicit_trailer = parsed_trailer + explicit_trailer + + if not isinstance(value, Choice): + explicit_header += parsed_header + explicit_trailer = parsed_trailer + explicit_trailer value = _build(*info, spec=spec, spec_params={'no_explicit': True}) value._header = explicit_header @@ -5134,15 +5537,20 @@ def _build(class_, method, tag, header, contents, trailer, spec=None, spec_param else: value.method = method value._indefinite = True - if tag != value.tag and tag != value._bad_tag: - raise ValueError(unwrap( - ''' - Error parsing %s - tag should have been %s, but %s was found - ''', - type_name(value), - value.tag, - tag - )) + if tag != value.tag: + if isinstance(value._bad_tag, tuple): + is_bad_tag = tag in value._bad_tag + else: + is_bad_tag = tag == value._bad_tag + if not is_bad_tag: + raise ValueError(unwrap( + ''' + Error parsing %s - tag should have been %s, but %s was found + ''', + type_name(value), + value.tag, + tag + )) # For explicitly tagged, un-speced parsings, we use a generic container # since we will be parsing the contents and discarding the outer object diff --git a/asn1crypto/keys.py b/asn1crypto/keys.py index 9a09a31..3d447e3 100644 --- a/asn1crypto/keys.py +++ b/asn1crypto/keys.py @@ -19,17 +19,8 @@ from __future__ import unicode_literals, division, absolute_import, print_functi import hashlib import math -from ._elliptic_curve import ( - SECP192R1_BASE_POINT, - SECP224R1_BASE_POINT, - SECP256R1_BASE_POINT, - SECP384R1_BASE_POINT, - SECP521R1_BASE_POINT, - PrimeCurve, - PrimePoint, -) -from ._errors import unwrap -from ._types import type_name, str_cls, byte_cls +from ._errors import unwrap, APIException +from ._types import type_name, byte_cls from .algos import _ForceNullParameters, DigestAlgorithm, EncryptionAlgorithm, RSAESOAEPParams from .core import ( Any, @@ -49,6 +40,7 @@ from .core import ( SetOf, ) from .util import int_from_bytes, int_to_bytes +from asn1crypto.algos import RSASSAPSSParams class OtherPrimeInfo(Sequence): @@ -363,7 +355,9 @@ class NamedCurve(ObjectIdentifier): '1.2.840.10045.3.1.5': 'prime239v2', '1.2.840.10045.3.1.6': 'prime239v3', # https://tools.ietf.org/html/rfc5480#page-5 + # http://www.secg.org/sec2-v2.pdf '1.3.132.0.1': 'sect163k1', + '1.3.132.0.10': 'secp256k1', '1.3.132.0.15': 'sect163r2', '1.2.840.10045.3.1.1': 'secp192r1', '1.3.132.0.33': 'secp224r1', @@ -380,6 +374,76 @@ class NamedCurve(ObjectIdentifier): '1.3.132.0.39': 'sect571r1', } + _key_sizes = { + # Order values used to compute these sourced from + # http://cr.openjdk.java.net/~vinnie/7194075/webrev-3/src/share/classes/sun/security/ec/CurveDB.java.html + '1.2.840.10045.3.0.1': 21, + '1.2.840.10045.3.0.2': 21, + '1.2.840.10045.3.0.3': 21, + '1.2.840.10045.3.0.4': 21, + '1.2.840.10045.3.0.5': 24, + '1.2.840.10045.3.0.6': 24, + '1.2.840.10045.3.0.7': 24, + '1.2.840.10045.3.0.8': 24, + '1.2.840.10045.3.0.9': 24, + '1.2.840.10045.3.0.10': 25, + '1.2.840.10045.3.0.11': 30, + '1.2.840.10045.3.0.12': 30, + '1.2.840.10045.3.0.13': 30, + '1.2.840.10045.3.0.14': 30, + '1.2.840.10045.3.0.15': 30, + '1.2.840.10045.3.0.16': 33, + '1.2.840.10045.3.0.17': 37, + '1.2.840.10045.3.0.18': 45, + '1.2.840.10045.3.0.19': 45, + '1.2.840.10045.3.0.20': 53, + '1.2.840.10045.3.1.2': 24, + '1.2.840.10045.3.1.3': 24, + '1.2.840.10045.3.1.4': 30, + '1.2.840.10045.3.1.5': 30, + '1.2.840.10045.3.1.6': 30, + # Order values used to compute these sourced from + # http://www.secg.org/SEC2-Ver-1.0.pdf + '1.3.132.0.1': 21, + '1.3.132.0.10': 32, + '1.3.132.0.15': 21, + '1.2.840.10045.3.1.1': 24, + '1.3.132.0.33': 28, + '1.3.132.0.26': 29, + '1.2.840.10045.3.1.7': 32, + '1.3.132.0.27': 29, + '1.3.132.0.16': 36, + '1.3.132.0.17': 36, + '1.3.132.0.34': 48, + '1.3.132.0.36': 51, + '1.3.132.0.37': 51, + '1.3.132.0.35': 66, + '1.3.132.0.38': 72, + '1.3.132.0.39': 72, + } + + @classmethod + def register(cls, name, oid, key_size): + """ + Registers a new named elliptic curve that is not included in the + default list of named curves + + :param name: + A unicode string of the curve name + + :param oid: + A unicode string of the dotted format OID + + :param key_size: + An integer of the number of bytes the private key should be + encoded to + """ + + cls._map[oid] = name + if cls._reverse_map is not None: + cls._reverse_map[name] = oid + cls._key_sizes[oid] = key_size + class ECDomainParameters(Choice): """ @@ -392,6 +456,31 @@ class ECDomainParameters(Choice): ('implicit_ca', Null), ] + @property + def key_size(self): + if self.name == 'implicit_ca': + raise ValueError(unwrap( + ''' + Unable to calculate key_size from ECDomainParameters + that are implicitly defined by the CA key + ''' + )) + + if self.name == 'specified': + order = self.chosen['order'].native + return math.ceil(math.log(order, 2.0) / 8.0) + + oid = self.chosen.dotted + if oid not in NamedCurve._key_sizes: + raise ValueError(unwrap( + ''' + The asn1crypto.keys.NamedCurve %s does not have a registered key length, + please call asn1crypto.keys.NamedCurve.register() + ''', + repr(oid) + )) + return NamedCurve._key_sizes[oid] + class ECPrivateKeyVersion(Integer): """ @@ -416,6 +505,48 @@ class ECPrivateKey(Sequence): ('public_key', ECPointBitString, {'explicit': 1, 'optional': True}), ] + # Ensures the key is set to the correct length when encoding + _key_size = None + + # This is necessary to ensure the private_key IntegerOctetString is encoded properly + def __setitem__(self, key, value): + res = super(ECPrivateKey, self).__setitem__(key, value) + + if key == 'private_key': + if self._key_size is None: + # Infer the key_size from the existing private key if possible + pkey_contents = self['private_key'].contents + if isinstance(pkey_contents, byte_cls) and len(pkey_contents) > 1: + self.set_key_size(len(self['private_key'].contents)) + + elif self._key_size is not None: + self._update_key_size() + + elif key == 'parameters' and isinstance(self['parameters'], ECDomainParameters) and \ + self['parameters'].name != 'implicit_ca': + self.set_key_size(self['parameters'].key_size) + + return res + + def set_key_size(self, key_size): + """ + Sets the key_size to ensure the private key is encoded to the proper length + + :param key_size: + An integer byte length to encode the private_key to + """ + + self._key_size = key_size + self._update_key_size() + + def _update_key_size(self): + """ + Ensure the private_key explicit encoding width is set + """ + + if self._key_size is not None and isinstance(self['private_key'], IntegerOctetString): + self['private_key'].set_encoded_width(self._key_size) + class DSAParams(Sequence): """ @@ -463,6 +594,8 @@ class PrivateKeyAlgorithmId(ObjectIdentifier): _map = { # https://tools.ietf.org/html/rfc3279#page-19 '1.2.840.113549.1.1.1': 'rsa', + # https://tools.ietf.org/html/rfc4055#page-8 + '1.2.840.113549.1.1.10': 'rsassa_pss', # https://tools.ietf.org/html/rfc3279#page-18 '1.2.840.10040.4.1': 'dsa', # https://tools.ietf.org/html/rfc3279#page-13 @@ -485,6 +618,7 @@ class PrivateKeyAlgorithm(_ForceNullParameters, Sequence): _oid_specs = { 'dsa': DSAParams, 'ec': ECDomainParameters, + 'rsassa_pss': RSASSAPSSParams, } @@ -504,6 +638,7 @@ class PrivateKeyInfo(Sequence): algorithm = self['private_key_algorithm']['algorithm'].native return { 'rsa': RSAPrivateKey, + 'rsassa_pss': RSAPrivateKey, 'dsa': Integer, 'ec': ECPrivateKey, }[algorithm] @@ -585,78 +720,24 @@ class PrivateKeyInfo(Sequence): return container - def _compute_public_key(self): - """ - Computes the public key corresponding to the current private key. + # This is necessary to ensure any contained ECPrivateKey is the + # correct size + def __setitem__(self, key, value): + res = super(PrivateKeyInfo, self).__setitem__(key, value) - :return: - For RSA keys, an RSAPublicKey object. For DSA keys, an Integer - object. For EC keys, an ECPointBitString. - """ + algorithm = self['private_key_algorithm'] - if self.algorithm == 'dsa': - params = self['private_key_algorithm']['parameters'] - return Integer(pow( - params['g'].native, - self['private_key'].parsed.native, - params['p'].native - )) + # When possible, use the parameter info to make sure the private key encoding + # retains any necessary leading bytes, instead of them being dropped + if (key == 'private_key_algorithm' or key == 'private_key') and \ + algorithm['algorithm'].native == 'ec' and \ + isinstance(algorithm['parameters'], ECDomainParameters) and \ + algorithm['parameters'].name != 'implicit_ca' and \ + isinstance(self['private_key'], ParsableOctetString) and \ + isinstance(self['private_key'].parsed, ECPrivateKey): + self['private_key'].parsed.set_key_size(algorithm['parameters'].key_size) - if self.algorithm == 'rsa': - key = self['private_key'].parsed - return RSAPublicKey({ - 'modulus': key['modulus'], - 'public_exponent': key['public_exponent'], - }) - - if self.algorithm == 'ec': - curve_type, details = self.curve - - if curve_type == 'implicit_ca': - raise ValueError(unwrap( - ''' - Unable to compute public key for EC key using Implicit CA - parameters - ''' - )) - - if curve_type == 'specified': - if details['field_id']['field_type'] == 'characteristic_two_field': - raise ValueError(unwrap( - ''' - Unable to compute public key for EC key over a - characteristic two field - ''' - )) - - curve = PrimeCurve( - details['field_id']['parameters'], - int_from_bytes(details['curve']['a']), - int_from_bytes(details['curve']['b']) - ) - base_x, base_y = self['private_key_algorithm']['parameters'].chosen['base'].to_coords() - base_point = PrimePoint(curve, base_x, base_y) - - elif curve_type == 'named': - if details not in ('secp192r1', 'secp224r1', 'secp256r1', 'secp384r1', 'secp521r1'): - raise ValueError(unwrap( - ''' - Unable to compute public key for EC named curve %s, - parameters not currently included - ''', - details - )) - - base_point = { - 'secp192r1': SECP192R1_BASE_POINT, - 'secp224r1': SECP224R1_BASE_POINT, - 'secp256r1': SECP256R1_BASE_POINT, - 'secp384r1': SECP384R1_BASE_POINT, - 'secp521r1': SECP521R1_BASE_POINT, - }[details] - - public_point = base_point * self['private_key'].parsed['private_key'].native - return ECPointBitString.from_coords(public_point.x, public_point.y) + return res def unwrap(self): """ @@ -667,25 +748,9 @@ class PrivateKeyInfo(Sequence): An RSAPrivateKey, DSAPrivateKey or ECPrivateKey object """ - if self.algorithm == 'rsa': - return self['private_key'].parsed - - if self.algorithm == 'dsa': - params = self['private_key_algorithm']['parameters'] - return DSAPrivateKey({ - 'version': 0, - 'p': params['p'], - 'q': params['q'], - 'g': params['g'], - 'public_key': self.public_key, - 'private_key': self['private_key'].parsed, - }) - - if self.algorithm == 'ec': - output = self['private_key'].parsed - output['parameters'] = self['private_key_algorithm']['parameters'] - output['public_key'] = self.public_key - return output + raise APIException( + 'asn1crypto.keys.PrivateKeyInfo().unwrap() has been removed, ' + 'please use oscrypto.asymmetric.PrivateKey().unwrap() instead') @property def curve(self): @@ -795,17 +860,9 @@ class PrivateKeyInfo(Sequence): object. If an EC key, an ECPointBitString object. """ - if self._public_key is None: - if self.algorithm == 'ec': - key = self['private_key'].parsed - if key['public_key']: - self._public_key = key['public_key'].untag() - else: - self._public_key = self._compute_public_key() - else: - self._public_key = self._compute_public_key() - - return self._public_key + raise APIException( + 'asn1crypto.keys.PrivateKeyInfo().public_key has been removed, ' + 'please use oscrypto.asymmetric.PrivateKey().public_key.unwrap() instead') @property def public_key_info(self): @@ -814,13 +871,9 @@ class PrivateKeyInfo(Sequence): A PublicKeyInfo object derived from this private key. """ - return PublicKeyInfo({ - 'algorithm': { - 'algorithm': self.algorithm, - 'parameters': self['private_key_algorithm']['parameters'] - }, - 'public_key': self.public_key - }) + raise APIException( + 'asn1crypto.keys.PrivateKeyInfo().public_key_info has been removed, ' + 'please use oscrypto.asymmetric.PrivateKey().public_key.asn1 instead') @property def fingerprint(self): @@ -836,51 +889,9 @@ class PrivateKeyInfo(Sequence): on the key type) """ - if self._fingerprint is None: - params = self['private_key_algorithm']['parameters'] - key = self['private_key'].parsed - - if self.algorithm == 'rsa': - to_hash = '%d:%d' % ( - key['modulus'].native, - key['public_exponent'].native, - ) - - elif self.algorithm == 'dsa': - public_key = self.public_key - to_hash = '%d:%d:%d:%d' % ( - params['p'].native, - params['q'].native, - params['g'].native, - public_key.native, - ) - - elif self.algorithm == 'ec': - public_key = key['public_key'].native - if public_key is None: - public_key = self.public_key.native - - if params.name == 'named': - to_hash = '%s:' % params.chosen.native - to_hash = to_hash.encode('utf-8') - to_hash += public_key - - elif params.name == 'implicit_ca': - to_hash = public_key - - elif params.name == 'specified': - to_hash = '%s:' % params.chosen['field_id']['parameters'].native - to_hash = to_hash.encode('utf-8') - to_hash += b':' + params.chosen['curve']['a'].native - to_hash += b':' + params.chosen['curve']['b'].native - to_hash += public_key - - if isinstance(to_hash, str_cls): - to_hash = to_hash.encode('utf-8') - - self._fingerprint = hashlib.sha256(to_hash).digest() - - return self._fingerprint + raise APIException( + 'asn1crypto.keys.PrivateKeyInfo().fingerprint has been removed, ' + 'please use oscrypto.asymmetric.PrivateKey().fingerprint instead') class EncryptedPrivateKeyInfo(Sequence): @@ -932,6 +943,8 @@ class PublicKeyAlgorithmId(ObjectIdentifier): '1.2.840.113549.1.1.1': 'rsa', # https://tools.ietf.org/html/rfc3447#page-47 '1.2.840.113549.1.1.7': 'rsaes_oaep', + # https://tools.ietf.org/html/rfc4055#page-8 + '1.2.840.113549.1.1.10': 'rsassa_pss', # https://tools.ietf.org/html/rfc3279#page-18 '1.2.840.10040.4.1': 'dsa', # https://tools.ietf.org/html/rfc3279#page-13 @@ -958,6 +971,7 @@ class PublicKeyAlgorithm(_ForceNullParameters, Sequence): 'ec': ECDomainParameters, 'dh': DomainParameters, 'rsaes_oaep': RSAESOAEPParams, + 'rsassa_pss': RSASSAPSSParams, } @@ -977,6 +991,7 @@ class PublicKeyInfo(Sequence): return { 'rsa': RSAPublicKey, 'rsaes_oaep': RSAPublicKey, + 'rsassa_pss': RSAPublicKey, 'dsa': Integer, # We override the field spec with ECPoint so that users can easily # decompose the byte string into the constituent X and Y coords @@ -1046,19 +1061,9 @@ class PublicKeyInfo(Sequence): An RSAPublicKey object """ - if self.algorithm == 'rsa': - return self['public_key'].parsed - - key_type = self.algorithm.upper() - a_an = 'an' if key_type == 'EC' else 'a' - raise ValueError(unwrap( - ''' - Only RSA public keys may be unwrapped - this key is %s %s public - key - ''', - a_an, - key_type - )) + raise APIException( + 'asn1crypto.keys.PublicKeyInfo().unwrap() has been removed, ' + 'please use oscrypto.asymmetric.PublicKey().unwrap() instead') @property def curve(self): @@ -1203,47 +1208,6 @@ class PublicKeyInfo(Sequence): on the key type) """ - if self._fingerprint is None: - key_type = self['algorithm']['algorithm'].native - params = self['algorithm']['parameters'] - - if key_type == 'rsa': - key = self['public_key'].parsed - to_hash = '%d:%d' % ( - key['modulus'].native, - key['public_exponent'].native, - ) - - elif key_type == 'dsa': - key = self['public_key'].parsed - to_hash = '%d:%d:%d:%d' % ( - params['p'].native, - params['q'].native, - params['g'].native, - key.native, - ) - - elif key_type == 'ec': - key = self['public_key'] - - if params.name == 'named': - to_hash = '%s:' % params.chosen.native - to_hash = to_hash.encode('utf-8') - to_hash += key.native - - elif params.name == 'implicit_ca': - to_hash = key.native - - elif params.name == 'specified': - to_hash = '%s:' % params.chosen['field_id']['parameters'].native - to_hash = to_hash.encode('utf-8') - to_hash += b':' + params.chosen['curve']['a'].native - to_hash += b':' + params.chosen['curve']['b'].native - to_hash += key.native - - if isinstance(to_hash, str_cls): - to_hash = to_hash.encode('utf-8') - - self._fingerprint = hashlib.sha256(to_hash).digest() - - return self._fingerprint + raise APIException( + 'asn1crypto.keys.PublicKeyInfo().fingerprint has been removed, ' + 'please use oscrypto.asymmetric.PublicKey().fingerprint instead') diff --git a/asn1crypto/ocsp.py b/asn1crypto/ocsp.py index f18d8e8..91c7fbf 100644 --- a/asn1crypto/ocsp.py +++ b/asn1crypto/ocsp.py @@ -12,6 +12,7 @@ Other type classes are defined that help compose the types listed above. from __future__ import unicode_literals, division, absolute_import, print_function +from ._errors import unwrap from .algos import DigestAlgorithm, SignedDigestAlgorithm from .core import ( Boolean, @@ -319,6 +320,56 @@ class ResponderId(Choice): ] +# Custom class to return a meaningful .native attribute from CertStatus() +class StatusGood(Null): + def set(self, value): + """ + Sets the value of the object + + :param value: + None or 'good' + """ + + if value is not None and value != 'good' and not isinstance(value, Null): + raise ValueError(unwrap( + ''' + value must be one of None, "good", not %s + ''', + repr(value) + )) + + self.contents = b'' + + @property + def native(self): + return 'good' + + +# Custom class to return a meaningful .native attribute from CertStatus() +class StatusUnknown(Null): + def set(self, value): + """ + Sets the value of the object + + :param value: + None or 'unknown' + """ + + if value is not None and value != 'unknown' and not isinstance(value, Null): + raise ValueError(unwrap( + ''' + value must be one of None, "unknown", not %s + ''', + repr(value) + )) + + self.contents = b'' + + @property + def native(self): + return 'unknown' + + class RevokedInfo(Sequence): _fields = [ ('revocation_time', GeneralizedTime), @@ -328,9 +379,9 @@ class RevokedInfo(Sequence): class CertStatus(Choice): _alternatives = [ - ('good', Null, {'implicit': 0}), + ('good', StatusGood, {'implicit': 0}), ('revoked', RevokedInfo, {'implicit': 1}), - ('unknown', Null, {'implicit': 2}), + ('unknown', StatusUnknown, {'implicit': 2}), ] diff --git a/asn1crypto/parser.py b/asn1crypto/parser.py index 07f53ab..c4f91f6 100644 --- a/asn1crypto/parser.py +++ b/asn1crypto/parser.py @@ -201,12 +201,6 @@ def _parse(encoded_data, data_len, pointer=0, lengths_only=False): # just scanned looking for \x00\x00, nested indefinite length values # would not work. contents_end = pointer - # Unfortunately we need to understand the contents of the data to - # properly scan forward, which bleeds some representation info into - # the parser. This condition handles the unused bits byte in - # constructed bit strings. - if tag == 3: - contents_end += 1 while contents_end < data_len: sub_header_end, contents_end = _parse(encoded_data, data_len, contents_end, lengths_only=True) if contents_end == sub_header_end and encoded_data[contents_end - 2:contents_end] == b'\x00\x00': @@ -270,11 +264,13 @@ def _dump_header(class_, method, tag, contents): id_num |= method << 5 if tag >= 31: - header += chr_cls(id_num | 31) + cont_bit = 0 while tag > 0: - continuation_bit = 0x80 if tag > 0x7F else 0 - header += chr_cls(continuation_bit | (tag & 0x7F)) + header = chr_cls(cont_bit | (tag & 0x7f)) + header + if not cont_bit: + cont_bit = 0x80 tag = tag >> 7 + header = chr_cls(id_num | 31) + header else: header += chr_cls(id_num | tag) diff --git a/asn1crypto/util.py b/asn1crypto/util.py index 2e55ef8..4d743df 100644 --- a/asn1crypto/util.py +++ b/asn1crypto/util.py @@ -8,6 +8,8 @@ from bytes and UTC timezone. Exports the following items: - int_from_bytes() - int_to_bytes() - timezone.utc + - utc_with_dst + - create_timezone() - inet_ntop() - inet_pton() - uri_to_iri() @@ -18,7 +20,7 @@ from __future__ import unicode_literals, division, absolute_import, print_functi import math import sys -from datetime import datetime, date, time +from datetime import datetime, date, timedelta, tzinfo from ._errors import unwrap from ._iri import iri_to_uri, uri_to_iri # noqa @@ -34,10 +36,6 @@ else: # Python 2 if sys.version_info <= (3,): - from datetime import timedelta, tzinfo - - py2 = True - def int_to_bytes(value, signed=False, width=None): """ Converts an integer to a byte string @@ -49,13 +47,16 @@ if sys.version_info <= (3,): If the byte string should be encoded using two's complement :param width: - None == auto, otherwise an integer of the byte width for the return - value + If None, the minimal possible size (but at least 1), + otherwise an integer of the byte width for the return value :return: A byte string """ + if value == 0 and width == 0: + return b'' + # Handle negatives in two's complement is_neg = False if signed and value < 0: @@ -73,6 +74,8 @@ if sys.version_info <= (3,): output = b'\x00' + output if width is not None: + if len(output) > width: + raise OverflowError('int too big to convert') if is_neg: pad_char = b'\xFF' else: @@ -112,29 +115,92 @@ if sys.version_info <= (3,): return num - class utc(tzinfo): # noqa + class timezone(tzinfo): # noqa + """ + Implements datetime.timezone for py2. + Only full minute offsets are supported. + DST is not supported. + """ + + def __init__(self, offset, name=None): + """ + :param offset: + A timedelta with this timezone's offset from UTC - def tzname(self, _): - return b'UTC+00:00' + :param name: + Name of the timezone; if None, generate one. + """ - def utcoffset(self, _): - return timedelta(0) + if not timedelta(hours=-24) < offset < timedelta(hours=24): + raise ValueError('Offset must be in [-23:59, 23:59]') - def dst(self, _): - return timedelta(0) + if offset.seconds % 60 or offset.microseconds: + raise ValueError('Offset must be full minutes') + + self._offset = offset + + if name is not None: + self._name = name + elif not offset: + self._name = 'UTC' + else: + self._name = 'UTC' + _format_offset(offset) + + def __eq__(self, other): + """ + Compare two timezones + + :param other: + The other timezone to compare to + + :return: + A boolean + """ + + if type(other) != timezone: + return False + return self._offset == other._offset + + def tzname(self, dt): + """ + :param dt: + A datetime object; ignored. - class timezone(): # noqa + :return: + Name of this timezone + """ - utc = utc() + return self._name + def utcoffset(self, dt): + """ + :param dt: + A datetime object; ignored. + + :return: + A timedelta object with the offset from UTC + """ + + return self._offset + + def dst(self, dt): + """ + :param dt: + A datetime object; ignored. + + :return: + Zero timedelta + """ + + return timedelta(0) + + timezone.utc = timezone(timedelta(0)) # Python 3 else: from datetime import timezone # noqa - py2 = False - def int_to_bytes(value, signed=False, width=None): """ Converts an integer to a byte string @@ -146,8 +212,8 @@ else: If the byte string should be encoded using two's complement :param width: - None == auto, otherwise an integer of the byte width for the return - value + If None, the minimal possible size (but at least 1), + otherwise an integer of the byte width for the return value :return: A byte string @@ -183,31 +249,66 @@ else: return int.from_bytes(value, 'big', signed=signed) -_DAYS_PER_MONTH_YEAR_0 = { - 1: 31, - 2: 29, # Year 0 was a leap year - 3: 31, - 4: 30, - 5: 31, - 6: 30, - 7: 31, - 8: 31, - 9: 30, - 10: 31, - 11: 30, - 12: 31 -} +def _format_offset(off): + """ + Format a timedelta into "[+-]HH:MM" format or "" for None + """ + if off is None: + return '' + mins = off.days * 24 * 60 + off.seconds // 60 + sign = '-' if mins < 0 else '+' + return sign + '%02d:%02d' % divmod(abs(mins), 60) -class extended_date(object): + +class _UtcWithDst(tzinfo): """ - A datetime.date-like object that can represent the year 0. This is just - to handle 0000-01-01 found in some certificates. + Utc class where dst does not return None; required for astimezone """ - year = None - month = None - day = None + def tzname(self, dt): + return 'UTC' + + def utcoffset(self, dt): + return timedelta(0) + + def dst(self, dt): + return timedelta(0) + + +utc_with_dst = _UtcWithDst() + +_timezone_cache = {} + + +def create_timezone(offset): + """ + Returns a new datetime.timezone object with the given offset. + Uses cached objects if possible. + + :param offset: + A datetime.timedelta object; It needs to be in full minutes and between -23:59 and +23:59. + + :return: + A datetime.timezone object + """ + + try: + tz = _timezone_cache[offset] + except KeyError: + tz = _timezone_cache[offset] = timezone(offset) + return tz + + +class extended_date(object): + """ + A datetime.datetime-like object that represents the year 0. This is just + to handle 0000-01-01 found in some certificates. Python's datetime does + not support year 0. + + The proleptic gregorian calendar repeats itself every 400 years. Therefore, + the simplest way to format is to substitute year 2000. + """ def __init__(self, year, month, day): """ @@ -224,73 +325,63 @@ class extended_date(object): if year != 0: raise ValueError('year must be 0') - if month < 1 or month > 12: - raise ValueError('month is out of range') - - if day < 0 or day > _DAYS_PER_MONTH_YEAR_0[month]: - raise ValueError('day is out of range') - - self.year = year - self.month = month - self.day = day + self._y2k = date(2000, month, day) - def _format(self, format): + @property + def year(self): + """ + :return: + The integer 0 """ - Performs strftime(), always returning a unicode string - :param format: - A strftime() format string + return 0 + @property + def month(self): + """ :return: - A unicode string of the formatted date - """ - - format = format.replace('%Y', '0000') - # Year 0 is 1BC and a leap year. Leap years repeat themselves - # every 28 years. Because of adjustments and the proleptic gregorian - # calendar, the simplest way to format is to substitute year 2000. - temp = date(2000, self.month, self.day) - if '%c' in format: - c_out = temp.strftime('%c') - # Handle full years - c_out = c_out.replace('2000', '0000') - c_out = c_out.replace('%', '%%') - format = format.replace('%c', c_out) - if '%x' in format: - x_out = temp.strftime('%x') - # Handle formats such as 08/16/2000 or 16.08.2000 - x_out = x_out.replace('2000', '0000') - x_out = x_out.replace('%', '%%') - format = format.replace('%x', x_out) - return temp.strftime(format) - - def isoformat(self): + An integer from 1 to 12 """ - Formats the date as %Y-%m-%d + return self._y2k.month + + @property + def day(self): + """ :return: - The date formatted to %Y-%m-%d as a unicode string in Python 3 - and a byte string in Python 2 + An integer from 1 to 31 """ - return self.strftime('0000-%m-%d') + return self._y2k.day def strftime(self, format): """ Formats the date using strftime() :param format: - The strftime() format string + A strftime() format string :return: - The formatted date as a unicode string in Python 3 and a byte - string in Python 2 + A str, the formatted date as a unicode string + in Python 3 and a byte string in Python 2 """ - output = self._format(format) - if py2: - return output.encode('utf-8') - return output + # Format the date twice, once with year 2000, once with year 4000. + # The only differences in the result will be in the millennium. Find them and replace by zeros. + y2k = self._y2k.strftime(format) + y4k = self._y2k.replace(year=4000).strftime(format) + return ''.join('0' if (c2, c4) == ('2', '4') else c2 for c2, c4 in zip(y2k, y4k)) + + def isoformat(self): + """ + Formats the date as %Y-%m-%d + + :return: + The date formatted to %Y-%m-%d as a unicode string in Python 3 + and a byte string in Python 2 + """ + + return self.strftime('0000-%m-%d') def replace(self, year=None, month=None, day=None): """ @@ -320,23 +411,40 @@ class extended_date(object): ) def __str__(self): - if py2: - return self.__bytes__() - else: - return self.__unicode__() - - def __bytes__(self): - return self.__unicode__().encode('utf-8') + """ + :return: + A str representing this extended_date, e.g. "0000-01-01" + """ - def __unicode__(self): - return self._format('%Y-%m-%d') + return self.strftime('%Y-%m-%d') def __eq__(self, other): + """ + Compare two extended_date objects + + :param other: + The other extended_date to compare to + + :return: + A boolean + """ + + # datetime.date object wouldn't compare equal because it can't be year 0 if not isinstance(other, self.__class__): return False return self.__cmp__(other) == 0 def __ne__(self, other): + """ + Compare two extended_date objects + + :param other: + The other extended_date to compare to + + :return: + A boolean + """ + return not self.__eq__(other) def _comparison_error(self, other): @@ -349,26 +457,26 @@ class extended_date(object): )) def __cmp__(self, other): + """ + Compare two extended_date or datetime.date objects + + :param other: + The other extended_date object to compare to + + :return: + An integer smaller than, equal to, or larger than 0 + """ + + # self is year 0, other is >= year 1 if isinstance(other, date): return -1 if not isinstance(other, self.__class__): self._comparison_error(other) - st = ( - self.year, - self.month, - self.day - ) - ot = ( - other.year, - other.month, - other.day - ) - - if st < ot: + if self._y2k < other._y2k: return -1 - if st > ot: + if self._y2k > other._y2k: return 1 return 0 @@ -387,158 +495,147 @@ class extended_date(object): class extended_datetime(object): """ - A datetime.datetime-like object that can represent the year 0. This is just - to handle 0000-01-01 found in some certificates. + A datetime.datetime-like object that represents the year 0. This is just + to handle 0000-01-01 found in some certificates. Python's datetime does + not support year 0. + + The proleptic gregorian calendar repeats itself every 400 years. Therefore, + the simplest way to format is to substitute year 2000. """ - year = None - month = None - day = None - hour = None - minute = None - second = None - microsecond = None - tzinfo = None + # There are 97 leap days during 400 years. + DAYS_IN_400_YEARS = 400 * 365 + 97 + DAYS_IN_2000_YEARS = 5 * DAYS_IN_400_YEARS - def __init__(self, year, month, day, hour=0, minute=0, second=0, microsecond=0, tzinfo=None): + def __init__(self, year, *args, **kwargs): """ :param year: The integer 0 - :param month: - An integer from 1 to 12 + :param args: + Other positional arguments; see datetime.datetime. - :param day: - An integer from 1 to 31 + :param kwargs: + Other keyword arguments; see datetime.datetime. + """ - :param hour: - An integer from 0 to 23 + if year != 0: + raise ValueError('year must be 0') - :param minute: - An integer from 0 to 59 + self._y2k = datetime(2000, *args, **kwargs) - :param second: - An integer from 0 to 59 + @property + def year(self): + """ + :return: + The integer 0 + """ - :param microsecond: - An integer from 0 to 999999 + return 0 + + @property + def month(self): + """ + :return: + An integer from 1 to 12 """ - if year != 0: - raise ValueError('year must be 0') + return self._y2k.month - if month < 1 or month > 12: - raise ValueError('month is out of range') + @property + def day(self): + """ + :return: + An integer from 1 to 31 + """ - if day < 0 or day > _DAYS_PER_MONTH_YEAR_0[month]: - raise ValueError('day is out of range') + return self._y2k.day - if hour < 0 or hour > 23: - raise ValueError('hour is out of range') + @property + def hour(self): + """ + :return: + An integer from 1 to 24 + """ - if minute < 0 or minute > 59: - raise ValueError('minute is out of range') + return self._y2k.hour - if second < 0 or second > 59: - raise ValueError('second is out of range') + @property + def minute(self): + """ + :return: + An integer from 1 to 60 + """ - if microsecond < 0 or microsecond > 999999: - raise ValueError('microsecond is out of range') + return self._y2k.minute - self.year = year - self.month = month - self.day = day - self.hour = hour - self.minute = minute - self.second = second - self.microsecond = microsecond - self.tzinfo = tzinfo + @property + def second(self): + """ + :return: + An integer from 1 to 60 + """ - def date(self): + return self._y2k.second + + @property + def microsecond(self): """ :return: - An asn1crypto.util.extended_date of the date + An integer from 0 to 999999 """ - return extended_date(self.year, self.month, self.day) + return self._y2k.microsecond - def time(self): + @property + def tzinfo(self): """ :return: - A datetime.time object of the time + If object is timezone aware, a datetime.tzinfo object, else None. """ - return time(self.hour, self.minute, self.second, self.microsecond, self.tzinfo) + return self._y2k.tzinfo def utcoffset(self): """ :return: - None or a datetime.timedelta() of the offset from UTC + If object is timezone aware, a datetime.timedelta object, else None. """ - if self.tzinfo is None: - return None - return self.tzinfo.utcoffset(self.replace(year=2000)) + return self._y2k.utcoffset() - def dst(self): + def time(self): """ :return: - None or a datetime.timedelta() of the daylight savings time offset + A datetime.time object """ - if self.tzinfo is None: - return None - return self.tzinfo.dst(self.replace(year=2000)) + return self._y2k.time() - def tzname(self): + def date(self): """ :return: - None or the name of the timezone as a unicode string in Python 3 - and a byte string in Python 2 + An asn1crypto.util.extended_date of the date """ - if self.tzinfo is None: - return None - return self.tzinfo.tzname(self.replace(year=2000)) + return extended_date(0, self.month, self.day) - def _format(self, format): + def strftime(self, format): """ - Performs strftime(), always returning a unicode string + Performs strftime(), always returning a str :param format: A strftime() format string :return: - A unicode string of the formatted datetime + A str of the formatted datetime """ - format = format.replace('%Y', '0000') - # Year 0 is 1BC and a leap year. Leap years repeat themselves - # every 28 years. Because of adjustments and the proleptic gregorian - # calendar, the simplest way to format is to substitute year 2000. - temp = datetime( - 2000, - self.month, - self.day, - self.hour, - self.minute, - self.second, - self.microsecond, - self.tzinfo - ) - if '%c' in format: - c_out = temp.strftime('%c') - # Handle full years - c_out = c_out.replace('2000', '0000') - c_out = c_out.replace('%', '%%') - format = format.replace('%c', c_out) - if '%x' in format: - x_out = temp.strftime('%x') - # Handle formats such as 08/16/2000 or 16.08.2000 - x_out = x_out.replace('2000', '0000') - x_out = x_out.replace('%', '%%') - format = format.replace('%x', x_out) - return temp.strftime(format) + # Format the datetime twice, once with year 2000, once with year 4000. + # The only differences in the result will be in the millennium. Find them and replace by zeros. + y2k = self._y2k.strftime(format) + y4k = self._y2k.replace(year=4000).strftime(format) + return ''.join('0' if (c2, c4) == ('2', '4') else c2 for c2, c4 in zip(y2k, y4k)) def isoformat(self, sep='T'): """ @@ -554,91 +651,97 @@ class extended_datetime(object): string in Python 2 """ - if self.microsecond == 0: - return self.strftime('0000-%%m-%%d%s%%H:%%M:%%S' % sep) - return self.strftime('0000-%%m-%%d%s%%H:%%M:%%S.%%f' % sep) + s = '0000-%02d-%02d%c%02d:%02d:%02d' % (self.month, self.day, sep, self.hour, self.minute, self.second) + if self.microsecond: + s += '.%06d' % self.microsecond + return s + _format_offset(self.utcoffset()) - def strftime(self, format): + def replace(self, year=None, *args, **kwargs): """ - Formats the date using strftime() + Returns a new datetime.datetime or asn1crypto.util.extended_datetime + object with the specified components replaced - :param format: - The strftime() format string + :param year: + The new year to substitute. None to keep it. + + :param args: + Other positional arguments; see datetime.datetime.replace. + + :param kwargs: + Other keyword arguments; see datetime.datetime.replace. :return: - The formatted date as a unicode string in Python 3 and a byte - string in Python 2 + A datetime.datetime or asn1crypto.util.extended_datetime object """ - output = self._format(format) - if py2: - return output.encode('utf-8') - return output + if year: + return self._y2k.replace(year, *args, **kwargs) + + return extended_datetime.from_y2k(self._y2k.replace(2000, *args, **kwargs)) - def replace(self, year=None, month=None, day=None, hour=None, minute=None, - second=None, microsecond=None, tzinfo=None): + def astimezone(self, tz): """ - Returns a new datetime.datetime or asn1crypto.util.extended_datetime - object with the specified components replaced + Convert this extended_datetime to another timezone. + + :param tz: + A datetime.tzinfo object. :return: - A datetime.datetime or asn1crypto.util.extended_datetime object + A new extended_datetime or datetime.datetime object """ - if year is None: - year = self.year - if month is None: - month = self.month - if day is None: - day = self.day - if hour is None: - hour = self.hour - if minute is None: - minute = self.minute - if second is None: - second = self.second - if microsecond is None: - microsecond = self.microsecond - if tzinfo is None: - tzinfo = self.tzinfo + return extended_datetime.from_y2k(self._y2k.astimezone(tz)) - if year > 0: - cls = datetime - else: - cls = extended_datetime + def timestamp(self): + """ + Return POSIX timestamp. Only supported in python >= 3.3 - return cls( - year, - month, - day, - hour, - minute, - second, - microsecond, - tzinfo - ) + :return: + A float representing the seconds since 1970-01-01 UTC. This will be a negative value. + """ - def __str__(self): - if py2: - return self.__bytes__() - else: - return self.__unicode__() + return self._y2k.timestamp() - self.DAYS_IN_2000_YEARS * 86400 - def __bytes__(self): - return self.__unicode__().encode('utf-8') + def __str__(self): + """ + :return: + A str representing this extended_datetime, e.g. "0000-01-01 00:00:00.000001-10:00" + """ - def __unicode__(self): - format = '%Y-%m-%d %H:%M:%S' - if self.microsecond != 0: - format += '.%f' - return self._format(format) + return self.isoformat(sep=' ') def __eq__(self, other): - if not isinstance(other, self.__class__): + """ + Compare two extended_datetime objects + + :param other: + The other extended_datetime to compare to + + :return: + A boolean + """ + + # Only compare against other datetime or extended_datetime objects + if not isinstance(other, (self.__class__, datetime)): return False + + # Offset-naive and offset-aware datetimes are never the same + if (self.tzinfo is None) != (other.tzinfo is None): + return False + return self.__cmp__(other) == 0 def __ne__(self, other): + """ + Compare two extended_datetime objects + + :param other: + The other extended_datetime to compare to + + :return: + A boolean + """ + return not self.__eq__(other) def _comparison_error(self, other): @@ -660,42 +763,27 @@ class extended_datetime(object): )) def __cmp__(self, other): - so = self.utcoffset() - oo = other.utcoffset() + """ + Compare two extended_datetime or datetime.datetime objects - if (so is not None and oo is None) or (so is None and oo is not None): - raise TypeError("can't compare offset-naive and offset-aware datetimes") + :param other: + The other extended_datetime or datetime.datetime object to compare to - if isinstance(other, datetime): - return -1 + :return: + An integer smaller than, equal to, or larger than 0 + """ - if not isinstance(other, self.__class__): + if not isinstance(other, (self.__class__, datetime)): self._comparison_error(other) - st = ( - self.year, - self.month, - self.day, - self.hour, - self.minute, - self.second, - self.microsecond, - so - ) - ot = ( - other.year, - other.month, - other.day, - other.hour, - other.minute, - other.second, - other.microsecond, - oo - ) + if (self.tzinfo is None) != (other.tzinfo is None): + raise TypeError("can't compare offset-naive and offset-aware datetimes") - if st < ot: + diff = self - other + zero = timedelta(0) + if diff < zero: return -1 - if st > ot: + if diff > zero: return 1 return 0 @@ -710,3 +798,71 @@ class extended_datetime(object): def __ge__(self, other): return self.__cmp__(other) >= 0 + + def __add__(self, other): + """ + Adds a timedelta + + :param other: + A datetime.timedelta object to add. + + :return: + A new extended_datetime or datetime.datetime object. + """ + + return extended_datetime.from_y2k(self._y2k + other) + + def __sub__(self, other): + """ + Subtracts a timedelta or another datetime. + + :param other: + A datetime.timedelta or datetime.datetime or extended_datetime object to subtract. + + :return: + If a timedelta is passed, a new extended_datetime or datetime.datetime object. + Else a datetime.timedelta object. + """ + + if isinstance(other, timedelta): + return extended_datetime.from_y2k(self._y2k - other) + + if isinstance(other, extended_datetime): + return self._y2k - other._y2k + + if isinstance(other, datetime): + return self._y2k - other - timedelta(days=self.DAYS_IN_2000_YEARS) + + return NotImplemented + + def __rsub__(self, other): + return -(self - other) + + @classmethod + def from_y2k(cls, value): + """ + Revert substitution of year 2000. + + :param value: + A datetime.datetime object which is 2000 years in the future. + :return: + A new extended_datetime or datetime.datetime object. + """ + + year = value.year - 2000 + + if year > 0: + new_cls = datetime + else: + new_cls = cls + + return new_cls( + year, + value.month, + value.day, + value.hour, + value.minute, + value.second, + value.microsecond, + value.tzinfo + ) diff --git a/asn1crypto/version.py b/asn1crypto/version.py index 2ce2408..0c08d01 100644 --- a/asn1crypto/version.py +++ b/asn1crypto/version.py @@ -2,5 +2,5 @@ from __future__ import unicode_literals, division, absolute_import, print_function -__version__ = '0.24.0' -__version_info__ = (0, 24, 0) +__version__ = '1.0.0' +__version_info__ = (1, 0, 0) diff --git a/asn1crypto/x509.py b/asn1crypto/x509.py index 5a572a3..8341bb2 100644 --- a/asn1crypto/x509.py +++ b/asn1crypto/x509.py @@ -71,7 +71,7 @@ from .util import int_to_bytes, int_from_bytes, inet_ntop, inet_pton class DNSName(IA5String): _encoding = 'idna' - _bad_tag = 19 + _bad_tag = (12, 19) def __ne__(self, other): return not self == other @@ -163,7 +163,7 @@ class URI(IA5String): if not isinstance(other, URI): return False - return iri_to_uri(self.native) == iri_to_uri(other.native) + return iri_to_uri(self.native, True) == iri_to_uri(other.native, True) def __unicode__(self): """ @@ -185,6 +185,9 @@ class EmailAddress(IA5String): # If the value has gone through the .set() method, thus normalizing it _normalized = False + # In the wild we've seen this encoded as a UTF8String and PrintableString + _bad_tag = (12, 19) + @property def contents(self): """ @@ -240,13 +243,15 @@ class EmailAddress(IA5String): A unicode string """ + # We've seen this in the wild as a PrintableString, and since ascii is a + # subset of cp1252, we use the later for decoding to be more user friendly if self._unicode is None: contents = self._merge_chunks() if contents.find(b'@') == -1: - self._unicode = contents.decode('ascii') + self._unicode = contents.decode('cp1252') else: mailbox, hostname = contents.rsplit(b'@', 1) - self._unicode = mailbox.decode('ascii') + '@' + hostname.decode('idna') + self._unicode = mailbox.decode('cp1252') + '@' + hostname.decode('idna') return self._unicode def __ne__(self, other): @@ -372,7 +377,7 @@ class IPAddress(OctetString): @property def native(self): """ - The a native Python datatype representation of this value + The native Python datatype representation of this value :return: A unicode string or None @@ -384,6 +389,7 @@ class IPAddress(OctetString): if self._native is None: byte_string = self.__bytes__() byte_len = len(byte_string) + value = None cidr_int = None if byte_len in set([32, 16]): value = inet_ntop(socket.AF_INET6, byte_string[0:16]) @@ -1692,6 +1698,8 @@ class KeyPurposeId(ObjectIdentifier): '1.3.6.1.5.5.7.3.29': 'cmc_archive', # https://tools.ietf.org/html/draft-ietf-sidr-bgpsec-pki-profiles-15#page-6 '1.3.6.1.5.5.7.3.30': 'bgpspec_router', + # https://www.ietf.org/proceedings/44/I-D/draft-ietf-ipsec-pki-req-01.txt + '1.3.6.1.5.5.8.2.2': 'ike_intermediate', # https://msdn.microsoft.com/en-us/library/windows/desktop/aa378132(v=vs.85).aspx # and https://support.microsoft.com/en-us/kb/287547 '1.3.6.1.4.1.311.10.3.1': 'microsoft_trust_list_signing', @@ -2573,6 +2581,22 @@ class Certificate(Sequence): return self._issuer_serial @property + def not_valid_after(self): + """ + :return: + A datetime of latest time when the certificate is still valid + """ + return self['tbs_certificate']['validity']['not_after'].native + + @property + def not_valid_before(self): + """ + :return: + A datetime of the earliest time when the certificate is valid + """ + return self['tbs_certificate']['validity']['not_before'].native + + @property def authority_key_identifier(self): """ :return: diff --git a/changelog.md b/changelog.md index 9bbf933..3792d84 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,85 @@ # changelog +## 1.0.0 + + - Backwards Compatibility Breaks + - `cms.KeyEncryptionAlgorithmId().native` now returns the value + `"rsaes_pkcs1v15"` for OID `1.2.840.113549.1.1.1` instead of `"rsa"` + - Removed functionality to calculate public key values from private key + values. Alternatives have been added to oscrypto. + - `keys.PrivateKeyInfo().unwrap()` is now + `oscrypto.asymmetric.PrivateKey().unwrap()` + - `keys.PrivateKeyInfo().public_key` is now + `oscrypto.asymmetric.PrivateKey().public_key.unwrap()` + - `keys.PrivateKeyInfo().public_key_info` is now + `oscrypto.asymmetric.PrivateKey().public_key.asn1` + - `keys.PrivateKeyInfo().fingerprint` is now + `oscrypto.asymmetric.PrivateKey().fingerprint` + - `keys.PublicKeyInfo().unwrap()` is now + `oscrypto.asymmetric.PublicKey().unwrap()` + - `keys.PublicKeyInfo().fingerprint` is now + `oscrypto.asymmetric.PublicKey().fingerprint` + - Enhancements + - Significantly improved parsing of `core.UTCTime()` and + `core.GeneralizedTime()` values that include timezones and fractional + seconds + - `util.timezone` has a more complete implementation + - `core.Choice()` may now be constructed by a 2-element tuple or a 1-key + dict + - Added `x509.Certificate().not_valid_before` and + `x509.Certificate().not_valid_after` + - Added `core.BitString().unused_bits` + - Added `keys.NamedCurve.register()` for non-mainstream curve OIDs + - No longer try to load optional performance dependency, `libcrypto`, + on Mac or Linux + - `ocsp.CertStatus().native` will now return meaningful unicode string + values when the status choice is `"good"` or `"unknown"`. Previously + both returned `None` due to the way the structure was designed. + - Add support for explicit RSA SSA PSS (`1.2.840.113549.1.1.10`) to + `keys.PublicKeyInfo()` and `keys.PrivateKeyInfo()` + - Added structures for nested SHA-256 Windows PE signatures to + `cms.CMSAttribute()` + - Added RC4 (`1.2.840.113549.3.4`) to `algos.EncryptionAlgorithmId()` + - Added secp256k1 (`1.3.132.0.10`) to `keys.NamedCurve()` + - Added SHA-3 and SHAKE OIDs to `algos.DigestAlgorithmId()` and + `algos.HmacAlgorithmId()` + - Added RSA ES OAEP (`1.2.840.113549.1.1.7`) to + `cms.KeyEncryptionAlgorithmId()` + - Add IKE Intermediate (`1.3.6.1.5.5.8.2.2`) to `x509.KeyPurposeId()` + - `x509.EmailAddress()` and `x509.DNSName()` now handle invalidly-encoded + values using tags for `core.PrintableString()` and `core.UTF8String()` + - Add parameter structue from RFC 5084 for AES-CCM to + `algos.EncryptionAlgorithm()` + - Improved robustness of parsing broken `core.Sequence()` and + `core.SequenceOf()` values + - Bug Fixes + - Fixed encoding of tag values over 30 + - `core.IntegerBitString()` and `core.IntegerOctetString()` now restrict + values to non-negative integers since negative values are not + implemented + - When copying or dumping a BER-encoded indefinite-length value, + automatically force re-encoding to DER. *To ensure all nested values are + always DER-encoded, `.dump(True)` must be called.* + - Fix `UnboundLocalError` when calling `x509.IPAddress().native` on an + encoded value that has a length of zero + - Fixed passing `class_` via unicode string name to `core.Asn1Value()` + - Fixed a bug where EC private keys with leading null bytes would be + encoded in `keys.ECPrivateKey()` more narrowly than RFC 5915 requires + - Fixed some edge-case bugs in `util.int_to_bytes()` + - `x509.URI()` now only normalizes values when comparing + - Fixed BER-decoding of indefinite length `core.BitString()` + - Fixed DER-encoding of empty `core.BitString()` + - Fixed a missing return value for `core.Choice().parse()` + - Fixed `core.Choice().contents` working when the chosen alternative is a + `core.Choice()` also + - Fixed parsing and encoding of nested `core.Choice()` objects + - Fixed a bug causing `core.ObjectIdentifier().native` to sometimes not + map the OID + - Packaging + - `wheel`, `sdist` and `bdist_egg` releases now all include LICENSE, + `sdist` includes docs + - Added `asn1crypto_tests` package to PyPi + ## 0.24.0 - `x509.Certificate().self_signed` will no longer return `"yes"` under any diff --git a/circle.yml b/circle.yml deleted file mode 100644 index 19a1b75..0000000 --- a/circle.yml +++ /dev/null @@ -1,20 +0,0 @@ -machine: - pre: - - pip install --user --ignore-installed --upgrade virtualenv pip - - ln -s ~/Library/Python/2.7/bin/virtualenv /usr/local/bin/virtualenv - - brew update -dependencies: - override: - - brew install python3 pypy -test: - override: - - /usr/bin/python2.6 run.py deps - - /usr/bin/python2.6 run.py ci - - /usr/bin/python2.7 run.py deps - - /usr/bin/python2.7 run.py ci - - OSCRYPTO_USE_OPENSSL=/usr/lib/libcrypto.dylib,/usr/lib/libssl.dylib /usr/bin/python2.7 run.py ci - - /usr/local/bin/python3 run.py deps - - /usr/local/bin/python3 run.py ci - - OSCRYPTO_USE_OPENSSL=/usr/lib/libcrypto.dylib,/usr/lib/libssl.dylib /usr/local/bin/python3 run.py ci - - /usr/local/bin/pypy run.py deps - - /usr/local/bin/pypy run.py ci diff --git a/dev/__init__.py b/dev/__init__.py index e69de29..02e9c6c 100644 --- a/dev/__init__.py +++ b/dev/__init__.py @@ -0,0 +1,26 @@ +# coding: utf-8 +from __future__ import unicode_literals, division, absolute_import, print_function + +import os + + +package_name = "asn1crypto" + +other_packages = [ + "oscrypto", + "certbuilder", + "certvalidator", + "crlbuilder", + "csrbuilder", + "ocspbuilder" +] + +requires_oscrypto = False +has_tests_package = True + +package_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +build_root = os.path.abspath(os.path.join(package_root, '..')) + +md_source_map = {} + +definition_replacements = {} diff --git a/dev/_import.py b/dev/_import.py new file mode 100644 index 0000000..2599588 --- /dev/null +++ b/dev/_import.py @@ -0,0 +1,93 @@ +# coding: utf-8 +from __future__ import unicode_literals, division, absolute_import, print_function + +import imp +import sys +import os + +from . import build_root + + +def _import_from(mod, path, mod_dir=None): + """ + Imports a module from a specific path + + :param mod: + A unicode string of the module name + + :param path: + A unicode string to the directory containing the module + + :param mod_dir: + If the sub directory of "path" is different than the "mod" name, + pass the sub directory as a unicode string + + :return: + None if not loaded, otherwise the module + """ + + if mod_dir is None: + mod_dir = mod + + if not os.path.exists(path): + return None + + if not os.path.exists(os.path.join(path, mod_dir)): + return None + + try: + mod_info = imp.find_module(mod_dir, [path]) + return imp.load_module(mod, *mod_info) + except ImportError: + return None + + +def _preload(require_oscrypto, print_info): + """ + Preloads asn1crypto and optionally oscrypto from a local source checkout, + or from a normal install + + :param require_oscrypto: + A bool if oscrypto needs to be preloaded + + :param print_info: + A bool if info about asn1crypto and oscrypto should be printed + """ + + if print_info: + print('Python ' + sys.version.replace('\n', '')) + + asn1crypto = None + oscrypto = None + + if require_oscrypto: + oscrypto_dir = os.path.join(build_root, 'oscrypto') + oscrypto_tests = None + if os.path.exists(oscrypto_dir): + oscrypto_tests = _import_from('oscrypto_tests', oscrypto_dir, 'tests') + if oscrypto_tests is None: + import oscrypto_tests + asn1crypto, oscrypto = oscrypto_tests.local_oscrypto() + + else: + asn1crypto_dir = os.path.join(build_root, 'asn1crypto') + if os.path.exists(asn1crypto_dir): + asn1crypto = _import_from('asn1crypto', asn1crypto_dir) + if asn1crypto is None: + import asn1crypto + + if print_info: + print( + '\nasn1crypto: %s, %s' % ( + asn1crypto.__version__, + os.path.dirname(asn1crypto.__file__) + ) + ) + if require_oscrypto: + print( + 'oscrypto: %s backend, %s, %s' % ( + oscrypto.backend(), + oscrypto.__version__, + os.path.dirname(oscrypto.__file__) + ) + ) diff --git a/dev/_pep425.py b/dev/_pep425.py new file mode 100644 index 0000000..949686a --- /dev/null +++ b/dev/_pep425.py @@ -0,0 +1,204 @@ +# coding: utf-8 + +""" +This file was originally derived from +https://github.com/pypa/pip/blob/3e713708088aedb1cde32f3c94333d6e29aaf86e/src/pip/_internal/pep425tags.py + +The following license covers that code: + +Copyright (c) 2008-2018 The pip developers (see AUTHORS.txt file) + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +""" + +from __future__ import unicode_literals, division, absolute_import, print_function + +import sys +import os +import ctypes +import re +import platform + +if sys.version_info >= (2, 7): + import sysconfig + +if sys.version_info < (3,): + str_cls = unicode # noqa +else: + str_cls = str + + +def _pep425_implementation(): + """ + :return: + A 2 character unicode string of the implementation - 'cp' for cpython + or 'pp' for PyPy + """ + + return 'pp' if hasattr(sys, 'pypy_version_info') else 'cp' + + +def _pep425_version(): + """ + :return: + A tuple of integers representing the Python version number + """ + + if hasattr(sys, 'pypy_version_info'): + return (sys.version_info[0], sys.pypy_version_info.major, + sys.pypy_version_info.minor) + else: + return (sys.version_info[0], sys.version_info[1]) + + +def _pep425_supports_manylinux(): + """ + :return: + A boolean indicating if the machine can use manylinux1 packages + """ + + try: + import _manylinux + return bool(_manylinux.manylinux1_compatible) + except (ImportError, AttributeError): + pass + + # Check for glibc 2.5 + try: + proc = ctypes.CDLL(None) + gnu_get_libc_version = proc.gnu_get_libc_version + gnu_get_libc_version.restype = ctypes.c_char_p + + ver = gnu_get_libc_version() + if not isinstance(ver, str_cls): + ver = ver.decode('ascii') + match = re.match(r'(\d+)\.(\d+)', ver) + return match and match.group(1) == '2' and int(match.group(2)) >= 5 + + except (AttributeError): + return False + + +def _pep425_get_abi(): + """ + :return: + A unicode string of the system abi. Will be something like: "cp27m", + "cp33m", etc. + """ + + try: + soabi = sysconfig.get_config_var('SOABI') + if soabi: + if soabi.startswith('cpython-'): + return 'cp%s' % soabi.split('-')[1] + return soabi.replace('.', '_').replace('-', '_') + except (IOError, NameError): + pass + + impl = _pep425_implementation() + suffix = '' + if impl == 'cp': + suffix += 'm' + if sys.maxunicode == 0x10ffff and sys.version_info < (3, 3): + suffix += 'u' + return '%s%s%s' % (impl, ''.join(map(str_cls, _pep425_version())), suffix) + + +def _pep425tags(): + """ + :return: + A list of 3-element tuples with unicode strings or None: + [0] implementation tag - cp33, pp27, cp26, py2, py2.py3 + [1] abi tag - cp26m, None + [2] arch tag - linux_x86_64, macosx_10_10_x85_64, etc + """ + + tags = [] + + versions = [] + version_info = _pep425_version() + major = version_info[:-1] + for minor in range(version_info[-1], -1, -1): + versions.append(''.join(map(str, major + (minor,)))) + + impl = _pep425_implementation() + + abis = [] + abi = _pep425_get_abi() + if abi: + abis.append(abi) + abi3 = _pep425_implementation() == 'cp' and sys.version_info >= (3,) + if abi3: + abis.append('abi3') + abis.append('none') + + if sys.platform == 'darwin': + plat_ver = platform.mac_ver() + ver_parts = plat_ver[0].split('.') + minor = int(ver_parts[1]) + arch = plat_ver[2] + if sys.maxsize == 2147483647: + arch = 'i386' + arches = [] + while minor > 5: + arches.append('macosx_10_%s_%s' % (minor, arch)) + arches.append('macosx_10_%s_intel' % (minor,)) + arches.append('macosx_10_%s_universal' % (minor,)) + minor -= 1 + else: + if sys.platform == 'win32': + if 'amd64' in sys.version.lower(): + arches = ['win_amd64'] + arches = [sys.platform] + elif hasattr(os, 'uname'): + (plat, _, _, _, machine) = os.uname() + plat = plat.lower().replace('/', '') + machine.replace(' ', '_').replace('/', '_') + if plat == 'linux' and sys.maxsize == 2147483647: + machine = 'i686' + arch = '%s_%s' % (plat, machine) + if _pep425_supports_manylinux(): + arches = [arch.replace('linux', 'manylinux1'), arch] + else: + arches = [arch] + + for abi in abis: + for arch in arches: + tags.append(('%s%s' % (impl, versions[0]), abi, arch)) + + if abi3: + for version in versions[1:]: + for arch in arches: + tags.append(('%s%s' % (impl, version), 'abi3', arch)) + + for arch in arches: + tags.append(('py%s' % (versions[0][0]), 'none', arch)) + + tags.append(('%s%s' % (impl, versions[0]), 'none', 'any')) + tags.append(('%s%s' % (impl, versions[0][0]), 'none', 'any')) + + for i, version in enumerate(versions): + tags.append(('py%s' % (version,), 'none', 'any')) + if i == 0: + tags.append(('py%s' % (version[0]), 'none', 'any')) + + tags.append(('py2.py3', 'none', 'any')) + + return tags diff --git a/dev/build.py b/dev/build.py new file mode 100644 index 0000000..4899594 --- /dev/null +++ b/dev/build.py @@ -0,0 +1,89 @@ +# coding: utf-8 +from __future__ import unicode_literals, division, absolute_import, print_function + +import imp +import os +import tarfile +import zipfile + +import setuptools.sandbox + +from . import package_root, package_name, has_tests_package + + +def _list_zip(filename): + """ + Prints all of the files in a .zip file + """ + + zf = zipfile.ZipFile(filename, 'r') + for name in zf.namelist(): + print(' %s' % name) + + +def _list_tgz(filename): + """ + Prints all of the files in a .tar.gz file + """ + + tf = tarfile.open(filename, 'r:gz') + for name in tf.getnames(): + print(' %s' % name) + + +def run(): + """ + Creates a sdist .tar.gz and a bdist_wheel --univeral .whl + + :return: + A bool - if the packaging process was successful + """ + + setup = os.path.join(package_root, 'setup.py') + tests_root = os.path.join(package_root, 'tests') + tests_setup = os.path.join(tests_root, 'setup.py') + + # Trying to call setuptools.sandbox.run_setup(setup, ['--version']) + # resulted in a segfault, so we do this instead + module_info = imp.find_module('version', [os.path.join(package_root, package_name)]) + version_mod = imp.load_module('%s.version' % package_name, *module_info) + + pkg_name_info = (package_name, version_mod.__version__) + print('Building %s-%s' % pkg_name_info) + + sdist = '%s-%s.tar.gz' % pkg_name_info + whl = '%s-%s-py2.py3-none-any.whl' % pkg_name_info + setuptools.sandbox.run_setup(setup, ['-q', 'sdist']) + print(' - created %s' % sdist) + _list_tgz(os.path.join(package_root, 'dist', sdist)) + setuptools.sandbox.run_setup(setup, ['-q', 'bdist_wheel', '--universal']) + print(' - created %s' % whl) + _list_zip(os.path.join(package_root, 'dist', whl)) + setuptools.sandbox.run_setup(setup, ['-q', 'clean']) + + if has_tests_package: + print('Building %s_tests-%s' % (package_name, version_mod.__version__)) + + tests_sdist = '%s_tests-%s.tar.gz' % pkg_name_info + tests_whl = '%s_tests-%s-py2.py3-none-any.whl' % pkg_name_info + setuptools.sandbox.run_setup(tests_setup, ['-q', 'sdist']) + print(' - created %s' % tests_sdist) + _list_tgz(os.path.join(tests_root, 'dist', tests_sdist)) + setuptools.sandbox.run_setup(tests_setup, ['-q', 'bdist_wheel', '--universal']) + print(' - created %s' % tests_whl) + _list_zip(os.path.join(tests_root, 'dist', tests_whl)) + setuptools.sandbox.run_setup(tests_setup, ['-q', 'clean']) + + dist_dir = os.path.join(package_root, 'dist') + tests_dist_dir = os.path.join(tests_root, 'dist') + os.rename( + os.path.join(tests_dist_dir, tests_sdist), + os.path.join(dist_dir, tests_sdist) + ) + os.rename( + os.path.join(tests_dist_dir, tests_whl), + os.path.join(dist_dir, tests_whl) + ) + os.rmdir(tests_dist_dir) + + return True @@ -1,8 +1,18 @@ # coding: utf-8 from __future__ import unicode_literals, division, absolute_import, print_function +import os +import site import sys +from . import build_root, requires_oscrypto +from ._import import _preload + + +deps_dir = os.path.join(build_root, 'modularcrypto-deps') +if os.path.exists(deps_dir): + site.addsitedir(deps_dir) + if sys.version_info[0:2] not in [(2, 6), (3, 2)]: from .lint import run as run_lint else: @@ -25,7 +35,8 @@ def run(): A bool - if the linter and tests ran successfully """ - print('Python ' + sys.version.replace('\n', '')) + _preload(requires_oscrypto, True) + if run_lint: print('') lint_result = run_lint() @@ -39,7 +50,7 @@ def run(): else: print('\nRunning tests') sys.stdout.flush() - tests_result = run_tests() + tests_result = run_tests(ci=True) sys.stdout.flush() return lint_result and tests_result diff --git a/dev/coverage.py b/dev/coverage.py index 5a24a4d..b9a55de 100644 --- a/dev/coverage.py +++ b/dev/coverage.py @@ -1,28 +1,38 @@ # coding: utf-8 from __future__ import unicode_literals, division, absolute_import, print_function +import cgi +import codecs import coverage import imp import json import os import unittest +import re import sys +import tempfile +import time import platform as _plat import subprocess from fnmatch import fnmatch +from . import package_name, package_root, other_packages + if sys.version_info < (3,): - str_cls = unicode - from urllib2 import Request, urlopen, URLError, HTTPError + str_cls = unicode # noqa + from urllib2 import URLError from urllib import urlencode - import cgi from io import open else: str_cls = str - from urllib.request import Request, urlopen - from urllib.error import URLError, HTTPError + from urllib.error import URLError from urllib.parse import urlencode +if sys.version_info < (3, 7): + Pattern = re._pattern_type +else: + Pattern = re.Pattern + def run(ci=False): """ @@ -36,11 +46,11 @@ def run(ci=False): A bool - if the tests ran successfully """ - xml_report_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'coverage.xml')) + xml_report_path = os.path.join(package_root, 'coverage.xml') if os.path.exists(xml_report_path): os.unlink(xml_report_path) - cov = coverage.Coverage(include='asn1crypto/*.py') + cov = coverage.Coverage(include='%s/*.py' % package_name) cov.start() from .tests import run as run_tests @@ -50,8 +60,8 @@ def run(ci=False): if ci: suite = unittest.TestSuite() loader = unittest.TestLoader() - for package_name in ['oscrypto', 'certbuilder', 'certvalidator', 'crlbuilder', 'csrbuild', 'ocspbuilder']: - for test_class in _load_package_tests(package_name): + for other_package in other_packages: + for test_class in _load_package_tests(other_package): suite.addTest(loader.loadTestsFromTestCase(test_class)) if suite.countTestCases() > 0: @@ -98,8 +108,44 @@ def _load_package_tests(name): return tests_module.test_classes() -def _codecov_submit(): +def _env_info(): + """ + :return: + A two-element tuple of unicode strings. The first is the name of the + environment, the second the root of the repo. The environment name + will be one of: "ci-travis", "ci-circle", "ci-appveyor", + "ci-github-actions", "local" + """ + if os.getenv('CI') == 'true' and os.getenv('TRAVIS') == 'true': + return ('ci-travis', os.getenv('TRAVIS_BUILD_DIR')) + + if os.getenv('CI') == 'True' and os.getenv('APPVEYOR') == 'True': + return ('ci-appveyor', os.getenv('APPVEYOR_BUILD_FOLDER')) + + if os.getenv('CI') == 'true' and os.getenv('CIRCLECI') == 'true': + return ('ci-circle', os.getcwdu() if sys.version_info < (3,) else os.getcwd()) + + if os.getenv('GITHUB_ACTIONS') == 'true': + return ('ci-github-actions', os.getenv('GITHUB_WORKSPACE')) + + return ('local', package_root) + + +def _codecov_submit(): + env_name, root = _env_info() + + try: + with open(os.path.join(root, 'codecov.json'), 'rb') as f: + json_data = json.loads(f.read().decode('utf-8')) + except (OSError, ValueError, UnicodeDecodeError, KeyError): + print('error reading codecov.json') + return + + if json_data.get('disabled'): + return + + if env_name == 'ci-travis': # http://docs.travis-ci.com/user/environment-variables/#Default-Environment-Variables build_url = 'https://travis-ci.org/%s/jobs/%s' % (os.getenv('TRAVIS_REPO_SLUG'), os.getenv('TRAVIS_JOB_ID')) query = { @@ -113,25 +159,30 @@ def _codecov_submit(): 'commit': os.getenv('TRAVIS_COMMIT'), 'build_url': build_url, } - root = os.getenv('TRAVIS_BUILD_DIR') - elif os.getenv('CI') == 'True' and os.getenv('APPVEYOR') == 'True': + elif env_name == 'ci-appveyor': # http://www.appveyor.com/docs/environment-variables - build_url = 'https://ci.appveyor.com/project/%s/build/%s' % (os.getenv('APPVEYOR_REPO_NAME'), os.getenv('APPVEYOR_BUILD_VERSION')) + build_url = 'https://ci.appveyor.com/project/%s/build/%s' % ( + os.getenv('APPVEYOR_REPO_NAME'), + os.getenv('APPVEYOR_BUILD_VERSION') + ) query = { 'service': "appveyor", 'branch': os.getenv('APPVEYOR_REPO_BRANCH'), 'build': os.getenv('APPVEYOR_JOB_ID'), 'pr': os.getenv('APPVEYOR_PULL_REQUEST_NUMBER'), - 'job': '/'.join((os.getenv('APPVEYOR_ACCOUNT_NAME'), os.getenv('APPVEYOR_PROJECT_SLUG'), os.getenv('APPVEYOR_BUILD_VERSION'))), + 'job': '/'.join(( + os.getenv('APPVEYOR_ACCOUNT_NAME'), + os.getenv('APPVEYOR_PROJECT_SLUG'), + os.getenv('APPVEYOR_BUILD_VERSION') + )), 'tag': os.getenv('APPVEYOR_REPO_TAG_NAME'), 'slug': os.getenv('APPVEYOR_REPO_NAME'), 'commit': os.getenv('APPVEYOR_REPO_COMMIT'), 'build_url': build_url, } - root = os.getenv('APPVEYOR_BUILD_FOLDER') - elif os.getenv('CI') == 'true' and os.getenv('CIRCLECI') == 'true': + elif env_name == 'ci-circle': # https://circleci.com/docs/environment-variables query = { 'service': 'circleci', @@ -144,12 +195,32 @@ def _codecov_submit(): 'commit': os.getenv('CIRCLE_SHA1'), 'build_url': os.getenv('CIRCLE_BUILD_URL'), } - if sys.version_info < (3,): - root = os.getcwdu() - else: - root = os.getcwd() + + elif env_name == 'ci-github-actions': + branch = '' + tag = '' + ref = os.getenv('GITHUB_REF', '') + if ref.startswith('refs/tags/'): + tag = ref[10:] + elif ref.startswith('refs/heads/'): + branch = ref[11:] + + impl = _plat.python_implementation() + major, minor = _plat.python_version_tuple()[0:2] + build_name = '%s %s %s.%s' % (_platform_name(), impl, major, minor) + + query = { + 'service': 'custom', + 'token': json_data['token'], + 'branch': branch, + 'tag': tag, + 'slug': os.getenv('GITHUB_REPOSITORY'), + 'commit': os.getenv('GITHUB_SHA'), + 'build_url': 'https://github.com/wbond/oscrypto/commit/%s/checks' % os.getenv('GITHUB_SHA'), + 'name': 'GitHub Actions %s on %s' % (build_name, os.getenv('RUNNER_OS')) + } + else: - root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) if not os.path.exists(os.path.join(root, '.git')): print('git repository not found, not submitting coverage data') return @@ -158,17 +229,6 @@ def _codecov_submit(): print('git repository has uncommitted changes, not submitting coverage data') return - slug = None - token = None - try: - with open(os.path.join(root, 'codecov.json'), 'rb') as f: - json_data = json.loads(f.read().decode('utf-8')) - slug = json_data['slug'] - token = json_data['token'] - except (OSError, ValueError, UnicodeDecodeError, KeyError): - print('error reading codecov.json') - return - branch = _git_command(['rev-parse', '--abbrev-ref', 'HEAD'], root) commit = _git_command(['rev-parse', '--verify', 'HEAD'], root) tag = _git_command(['name-rev', '--tags', '--name-only', commit], root) @@ -178,8 +238,8 @@ def _codecov_submit(): query = { 'branch': branch, 'commit': commit, - 'slug': slug, - 'token': token, + 'slug': json_data['slug'], + 'token': json_data['token'], 'build': build_name, } if tag != 'undefined': @@ -198,7 +258,7 @@ def _codecov_submit(): payload += '# path=coverage.xml\n' with open(os.path.join(root, 'coverage.xml'), 'r', encoding='utf-8') as f: payload += f.read() + '\n' - payload +='<<<<<< EOF\n' + payload += '<<<<<< EOF\n' url = 'https://codecov.io/upload/v4' headers = { @@ -222,7 +282,7 @@ def _codecov_submit(): encoding = info[1] or 'utf-8' text = info[2].decode(encoding).strip() parts = text.split() - result, upload_url = parts[0], parts[1] + upload_url = parts[1] headers = { 'Content-Type': 'text/plain', @@ -231,7 +291,7 @@ def _codecov_submit(): } print('Uploading coverage data to codecov.io S3 bucket') - put_info = _do_request( + _do_request( 'PUT', upload_url, headers, @@ -430,12 +490,12 @@ def _gitignore(root): return (dir_patterns, file_patterns) -def _do_request(method, url, headers, data=None, query_params=None, timeout=30): +def _do_request(method, url, headers, data=None, query_params=None, timeout=20): """ Performs an HTTP request :param method: - A unicode string of 'GET', 'POST', 'PUT', or 'DELETE' + A unicode string of 'POST' or 'PUT' :param url; A unicode string of the URL to request @@ -464,9 +524,6 @@ def _do_request(method, url, headers, data=None, query_params=None, timeout=30): if query_params: url += '?' + urlencode(query_params).replace('+', '%20') - request = Request(url) - request.get_method = lambda: method - if isinstance(data, dict): data_bytes = {} for key in data: @@ -476,29 +533,142 @@ def _do_request(method, url, headers, data=None, query_params=None, timeout=30): if isinstance(data, str_cls): raise TypeError('data must be a byte string') - for key in headers: - value = headers[key] - if sys.version_info < (3,): - key = key.encode('iso-8859-1') - value = value.encode('iso-8859-1') - request.add_header(key, value) - - response = urlopen(request, data, timeout) - if sys.version_info < (3,): - status = response.getcode() - try: - content_type, params = cgi.parse_header(response.headers['Content-Type'].strip()) - encoding = params.get('charset') - except (KeyError): - content_type = None - encoding = None + try: + tempfd, tempf_path = tempfile.mkstemp('-coverage') + os.write(tempfd, data or b'') + os.close(tempfd) + + if sys.platform == 'win32': + powershell_exe = os.path.join('system32\\WindowsPowerShell\\v1.0\\powershell.exe') + code = "[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.SecurityProtocolType]::Tls12;" + code += "$wc = New-Object Net.WebClient;" + for key in headers: + code += "$wc.Headers.add('%s','%s');" % (key, headers[key]) + code += "$out = $wc.UploadFile('%s', '%s', '%s');" % (url, method, tempf_path) + code += "[System.Text.Encoding]::GetEncoding('ISO-8859-1').GetString($wc.ResponseHeaders.ToByteArray())" + + # To properly obtain bytes, we use BitConverter to get hex dash + # encoding (e.g. AE-09-3F) and they decode in python + code += " + [System.BitConverter]::ToString($out);" + stdout, stderr = _execute( + [powershell_exe, '-Command', code], + os.getcwd(), + re.compile(r'Unable to connect to|TLS|Internal Server Error'), + 6 + ) + if stdout[-2:] == b'\r\n' and b'\r\n\r\n' in stdout: + # An extra trailing crlf is added at the end by powershell + stdout = stdout[0:-2] + parts = stdout.split(b'\r\n\r\n', 1) + if len(parts) == 2: + stdout = parts[0] + b'\r\n\r\n' + codecs.decode(parts[1].replace(b'-', b''), 'hex_codec') + + else: + args = [ + 'curl', + '--request', + method, + '--location', + '--silent', + '--show-error', + '--include', + # Prevent curl from asking for an HTTP "100 Continue" response + '--header', 'Expect:' + ] + for key in headers: + args.append('--header') + args.append("%s: %s" % (key, headers[key])) + args.append('--data-binary') + args.append('@%s' % tempf_path) + args.append(url) + stdout, stderr = _execute( + args, + os.getcwd(), + re.compile(r'Failed to connect to|TLS|SSLRead|outstanding|cleanly'), + 6 + ) + finally: + if tempf_path and os.path.exists(tempf_path): + os.remove(tempf_path) + + if len(stderr) > 0: + raise URLError("Error %sing %s:\n%s" % (method, url, stderr)) + + parts = stdout.split(b'\r\n\r\n', 1) + if len(parts) != 2: + raise URLError("Error %sing %s, response data malformed:\n%s" % (method, url, stdout)) + header_block, body = parts + + content_type_header = None + content_len_header = None + for hline in header_block.decode('iso-8859-1').splitlines(): + hline_parts = hline.split(':', 1) + if len(hline_parts) != 2: + continue + name, val = hline_parts + name = name.strip().lower() + val = val.strip() + if name == 'content-type': + content_type_header = val + if name == 'content-length': + content_len_header = val + + if content_type_header is None and content_len_header != '0': + raise URLError("Error %sing %s, no content-type header:\n%s" % (method, url, stdout)) + + if content_type_header is None: + content_type = 'text/plain' + encoding = 'utf-8' else: - status = response.status - content_type = response.info().get_content_type() - encoding = response.headers.get_content_charset() - if status != 200: - raise HTTPError('Unexpected HTTP %d response' % status) - return (content_type, encoding, response.read()) + content_type, params = cgi.parse_header(content_type_header) + encoding = params.get('charset') + + return (content_type, encoding, body) + + +def _execute(params, cwd, retry=None, retries=0): + """ + Executes a subprocess + + :param params: + A list of the executable and arguments to pass to it + + :param cwd: + The working directory to execute the command in + + :param retry: + If this string is present in stderr, or regex pattern matches stderr, retry the operation + + :param retries: + An integer number of times to retry + + :return: + A 2-element tuple of (stdout, stderr) + """ + + proc = subprocess.Popen( + params, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=cwd + ) + stdout, stderr = proc.communicate() + code = proc.wait() + if code != 0: + if retry and retries > 0: + stderr_str = stderr.decode('utf-8') + if isinstance(retry, Pattern): + if retry.search(stderr_str) is not None: + time.sleep(5) + return _execute(params, cwd, retry, retries - 1) + elif retry in stderr_str: + time.sleep(5) + return _execute(params, cwd, retry, retries - 1) + e = OSError('subprocess exit code for "%s" was %d: %s' % (' '.join(params), code, stderr)) + e.stdout = stdout + e.stderr = stderr + raise e + return (stdout, stderr) if __name__ == '__main__': diff --git a/dev/deps.py b/dev/deps.py index 36370f0..7014172 100644 --- a/dev/deps.py +++ b/dev/deps.py @@ -1,66 +1,58 @@ # coding: utf-8 from __future__ import unicode_literals, division, absolute_import, print_function -import imp import os import subprocess import sys -import warnings import shutil -import tempfile -import platform -import site +import re +import json +import tarfile +import zipfile +from . import package_root, build_root, other_packages +from ._pep425 import _pep425tags, _pep425_implementation -OTHER_PACKAGES = [ - 'https://github.com/wbond/oscrypto.git', - 'https://github.com/wbond/certbuilder.git', - 'https://github.com/wbond/certvalidator.git', - 'https://github.com/wbond/crlbuilder.git', - 'https://github.com/wbond/csrbuilder.git', - 'https://github.com/wbond/ocspbuilder.git', -] +if sys.version_info < (3,): + str_cls = unicode # noqa +else: + str_cls = str def run(): """ - Ensures a recent version of pip is installed, then uses that to install - required development dependencies. Uses git to checkout other modularcrypto - repos for more accurate coverage data. + Installs required development dependencies. Uses git to checkout other + modularcrypto repos for more accurate coverage data. """ - package_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) - build_root = os.path.abspath(os.path.join(package_root, '..')) - try: - tmpdir = None - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - - major_minor = '%s.%s' % sys.version_info[0:2] - tmpdir = tempfile.mkdtemp() - _pip = _bootstrap_pip(tmpdir) - - print("Using pip to install dependencies") - _pip(['install', '-q', '--upgrade', '-r', os.path.join(package_root, 'requires', 'ci')]) - - if OTHER_PACKAGES: - print("Checking out modularcrypto packages for coverage") - for pkg_url in OTHER_PACKAGES: - pkg_name = os.path.basename(pkg_url).replace('.git', '') - pkg_dir = os.path.join(build_root, pkg_name) - if os.path.exists(pkg_dir): - print("%s is already present" % pkg_name) - continue - print("Cloning %s" % pkg_url) - _execute(['git', 'clone', pkg_url], build_root) - print() + deps_dir = os.path.join(build_root, 'modularcrypto-deps') + if os.path.exists(deps_dir): + shutil.rmtree(deps_dir, ignore_errors=True) + os.mkdir(deps_dir) - finally: - if tmpdir: - shutil.rmtree(tmpdir, ignore_errors=True) + try: + print("Staging ci dependencies") + _stage_requirements(deps_dir, os.path.join(package_root, 'requires', 'ci')) + + print("Checking out modularcrypto packages for coverage") + for other_package in other_packages: + pkg_url = 'https://github.com/wbond/%s.git' % other_package + pkg_dir = os.path.join(build_root, other_package) + if os.path.exists(pkg_dir): + print("%s is already present" % other_package) + continue + print("Cloning %s" % pkg_url) + _execute(['git', 'clone', pkg_url], build_root) + print() + + except (Exception): + if os.path.exists(deps_dir): + shutil.rmtree(deps_dir, ignore_errors=True) + raise return True + def _download(url, dest): """ Downloads a URL to a directory @@ -75,138 +67,455 @@ def _download(url, dest): The filesystem path to the saved file """ + print('Downloading %s' % url) filename = os.path.basename(url) dest_path = os.path.join(dest, filename) if sys.platform == 'win32': - system_root = os.environ.get('SystemRoot') powershell_exe = os.path.join('system32\\WindowsPowerShell\\v1.0\\powershell.exe') code = "[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.SecurityProtocolType]::Tls12;" - code += " (New-Object Net.WebClient).DownloadFile('%s', '%s');" % (url, dest_path) - _execute([powershell_exe, '-Command', code], dest) + code += "(New-Object Net.WebClient).DownloadFile('%s', '%s');" % (url, dest_path) + _execute([powershell_exe, '-Command', code], dest, 'Unable to connect to') else: - _execute(['curl', '--silent', '--show-error', '-O', url], dest) + _execute( + ['curl', '-L', '--silent', '--show-error', '-O', url], + dest, + 'Failed to connect to' + ) return dest_path -def _execute(params, cwd): +def _tuple_from_ver(version_string): """ - Executes a subprocess + :param version_string: + A unicode dotted version string - :param params: - A list of the executable and arguments to pass to it + :return: + A tuple of integers + """ - :param cwd: - The working directory to execute the command in + return tuple(map(int, version_string.split('.'))) + + +def _open_archive(path): + """ + :param path: + A unicode string of the filesystem path to the archive :return: - A 2-element tuple of (stdout, stderr) + An archive object """ - proc = subprocess.Popen( - params, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - cwd=cwd - ) - stdout, stderr = proc.communicate() - code = proc.wait() - if code != 0: - e = OSError('subprocess exit code was non-zero') - e.stdout = stdout - e.stderr = stderr - raise e - return (stdout, stderr) + if path.endswith('.zip'): + return zipfile.ZipFile(path, 'r') + return tarfile.open(path, 'r') -def _get_pip_main(download_dir): +def _list_archive_members(archive): """ - Executes get-pip.py in the current Python interpreter + :param archive: + An archive from _open_archive() - :param download_dir: - The directory that contains get-pip.py + :return: + A list of info objects to be used with _info_name() and _extract_info() """ - module_info = imp.find_module('get-pip', [download_dir]) - get_pip_module = imp.load_module('_cideps.get-pip', *module_info) + if isinstance(archive, zipfile.ZipFile): + return archive.infolist() + return archive.getmembers() - orig_sys_exit = sys.exit - orig_sys_argv = sys.argv - sys.exit = lambda c: None - sys.argv = ['get-pip.py', '--user', '-q'] - get_pip_module.main() +def _archive_single_dir(archive): + """ + Check if all members of the archive are in a single top-level directory - sys.exit = orig_sys_exit - sys.argv = orig_sys_argv + :param archive: + An archive from _open_archive() - # Unload pip modules that came from the zip file - module_names = sorted(sys.modules.keys()) - end_token = os.sep + 'pip.zip' - mid_token = end_token + os.sep + 'pip' - for module_name in module_names: - try: - module_path = sys.modules[module_name].__file__ - if mid_token in module_path or module_path.endswith(end_token): - del sys.modules[module_name] - except AttributeError: - pass + :return: + None if not a single top level directory in archive, otherwise a + unicode string of the top level directory name + """ + + common_root = None + for info in _list_archive_members(archive): + fn = _info_name(info) + if fn in set(['.', '/']): + continue + sep = None + if '/' in fn: + sep = '/' + elif '\\' in fn: + sep = '\\' + if sep is None: + root_dir = fn + else: + root_dir, _ = fn.split(sep, 1) + if common_root is None: + common_root = root_dir + else: + if common_root != root_dir: + return None + return common_root + + +def _info_name(info): + """ + Returns a normalized file path for an archive info object - if sys.path[0].endswith('pip.zip'): - sys.path = sys.path[1:] + :param info: + An info object from _list_archive_members() - if site.USER_SITE not in sys.path: - sys.path.append(site.USER_SITE) + :return: + A unicode string with all directory separators normalized to "/" + """ + if isinstance(info, zipfile.ZipInfo): + return info.filename.replace('\\', '/') + return info.name.replace('\\', '/') -def _bootstrap_pip(tmpdir): + +def _extract_info(archive, info): """ - Bootstraps the current version of pip for use in the current Python - interpreter + Extracts the contents of an archive info object + + ;param archive: + An archive from _open_archive() - :param tmpdir: - A temporary directory to download get-pip.py and cacert.pem + :param info: + An info object from _list_archive_members() :return: - A function that invokes pip. Accepts one arguments, a list of parameters - to pass to pip. + None, or a byte string of the file contents + """ + + if isinstance(archive, zipfile.ZipFile): + fn = info.filename + is_dir = fn.endswith('/') or fn.endswith('\\') + out = archive.read(info) + if is_dir and out == b'': + return None + return out + + info_file = archive.extractfile(info) + if info_file: + return info_file.read() + return None + + +def _extract_package(deps_dir, pkg_path, pkg_dir): + """ + Extract a .whl, .zip, .tar.gz or .tar.bz2 into a package path to + use when running CI tasks + + :param deps_dir: + A unicode string of the directory the package should be extracted to + + :param pkg_path: + A unicode string of the path to the archive + + :param pkg_dir: + If running setup.py, change to this dir first - a unicode string """ + if pkg_path.endswith('.exe'): + try: + zf = None + zf = zipfile.ZipFile(pkg_path, 'r') + # Exes have a PLATLIB folder containing everything we want + for zi in zf.infolist(): + if not zi.filename.startswith('PLATLIB'): + continue + data = _extract_info(zf, zi) + if data is not None: + dst_path = os.path.join(deps_dir, zi.filename[8:]) + dst_dir = os.path.dirname(dst_path) + if not os.path.exists(dst_dir): + os.makedirs(dst_dir) + with open(dst_path, 'wb') as f: + f.write(data) + finally: + if zf: + zf.close() + return + + if pkg_path.endswith('.whl'): + try: + zf = None + zf = zipfile.ZipFile(pkg_path, 'r') + # Wheels contain exactly what we need and nothing else + zf.extractall(deps_dir) + finally: + if zf: + zf.close() + return + + # Source archives may contain a bunch of other things, including mutliple + # packages, so we must use setup.py/setuptool to install/extract it + + ar = None + staging_dir = os.path.join(deps_dir, '_staging') try: - import pip + ar = _open_archive(pkg_path) + + common_root = _archive_single_dir(ar) + + members = [] + for info in _list_archive_members(ar): + dst_rel_path = _info_name(info) + if common_root is not None: + dst_rel_path = dst_rel_path[len(common_root) + 1:] + members.append((info, dst_rel_path)) + + if not os.path.exists(staging_dir): + os.makedirs(staging_dir) + + for info, rel_path in members: + info_data = _extract_info(ar, info) + # Dirs won't return a file + if info_data is not None: + dst_path = os.path.join(staging_dir, rel_path) + dst_dir = os.path.dirname(dst_path) + if not os.path.exists(dst_dir): + os.makedirs(dst_dir) + with open(dst_path, 'wb') as f: + f.write(info_data) + + setup_dir = staging_dir + if pkg_dir: + setup_dir = os.path.join(staging_dir, pkg_dir) + + root = os.path.abspath(os.path.join(deps_dir, '..')) + install_lib = os.path.basename(deps_dir) + + _execute( + [ + sys.executable, + 'setup.py', + 'install', + '--root=%s' % root, + '--install-lib=%s' % install_lib, + '--no-compile' + ], + setup_dir + ) + + finally: + if ar: + ar.close() + if staging_dir: + shutil.rmtree(staging_dir) + + +def _stage_requirements(deps_dir, path): + """ + Installs requirements without using Python to download, since + different services are limiting to TLS 1.2, and older version of + Python do not support that + + :param deps_dir: + A unicode path to a temporary diretory to use for downloads + + :param path: + A unicode filesystem path to a requirements file + """ + + valid_tags = _pep425tags() + + exe_suffix = None + if sys.platform == 'win32' and _pep425_implementation() == 'cp': + win_arch = 'win32' if sys.maxsize == 2147483647 else 'win-amd64' + version_info = sys.version_info + exe_suffix = '.%s-py%d.%d.exe' % (win_arch, version_info[0], version_info[1]) + + packages = _parse_requires(path) + for p in packages: + pkg = p['pkg'] + pkg_sub_dir = None + if p['type'] == 'url': + anchor = None + if '#' in pkg: + pkg, anchor = pkg.split('#', 1) + if '&' in anchor: + parts = anchor.split('&') + else: + parts = [anchor] + for part in parts: + param, value = part.split('=') + if param == 'subdirectory': + pkg_sub_dir = value + + if pkg.endswith('.zip') or pkg.endswith('.tar.gz') or pkg.endswith('.tar.bz2') or pkg.endswith('.whl'): + url = pkg + else: + raise Exception('Unable to install package from URL that is not an archive') + else: + pypi_json_url = 'https://pypi.org/pypi/%s/json' % pkg + json_dest = _download(pypi_json_url, deps_dir) + with open(json_dest, 'rb') as f: + pkg_info = json.loads(f.read().decode('utf-8')) + if os.path.exists(json_dest): + os.remove(json_dest) + + latest = pkg_info['info']['version'] + if p['type'] == '>=': + if _tuple_from_ver(p['ver']) > _tuple_from_ver(latest): + raise Exception('Unable to find version %s of %s, newest is %s' % (p['ver'], pkg, latest)) + version = latest + elif p['type'] == '==': + if p['ver'] not in pkg_info['releases']: + raise Exception('Unable to find version %s of %s' % (p['ver'], pkg)) + version = p['ver'] + else: + version = latest + + wheels = {} + whl = None + tar_bz2 = None + tar_gz = None + exe = None + for download in pkg_info['releases'][version]: + if exe_suffix and download['url'].endswith(exe_suffix): + exe = download['url'] + if download['url'].endswith('.whl'): + parts = os.path.basename(download['url']).split('-') + tag_impl = parts[-3] + tag_abi = parts[-2] + tag_arch = parts[-1].split('.')[0] + wheels[(tag_impl, tag_abi, tag_arch)] = download['url'] + if download['url'].endswith('.tar.bz2'): + tar_bz2 = download['url'] + if download['url'].endswith('.tar.gz'): + tar_gz = download['url'] + + # Find the most-specific wheel possible + for tag in valid_tags: + if tag in wheels: + whl = wheels[tag] + break + + if exe_suffix and exe: + url = exe + elif whl: + url = whl + elif tar_bz2: + url = tar_bz2 + elif tar_gz: + url = tar_gz + else: + raise Exception('Unable to find suitable download for %s' % pkg) + + local_path = _download(url, deps_dir) + + _extract_package(deps_dir, local_path, pkg_sub_dir) + + os.remove(local_path) + + +def _parse_requires(path): + """ + Does basic parsing of pip requirements files, to allow for + using something other than Python to do actual TLS requests - print('Upgrading pip') - pip.main(['install', '-q', '--upgrade', 'pip']) - certs_path = None + :param path: + A path to a requirements file - except ImportError: - print("Downloading cacert.pem from curl") - certs_path = _download('https://curl.haxx.se/ca/cacert.pem', tmpdir) + :return: + A list of dict objects containing the keys: + - 'type' ('any', 'url', '==', '>=') + - 'pkg' + - 'ver' (if 'type' == '==' or 'type' == '>=') + """ - print("Downloading get-pip.py") - if sys.version_info[0:2] == (3, 2): - path = _download('https://bootstrap.pypa.io/3.2/get-pip.py', tmpdir) + python_version = '.'.join(map(str_cls, sys.version_info[0:2])) + sys_platform = sys.platform + + packages = [] + + with open(path, 'rb') as f: + contents = f.read().decode('utf-8') + + for line in re.split(r'\r?\n', contents): + line = line.strip() + if not len(line): + continue + if re.match(r'^\s*#', line): + continue + if ';' in line: + package, cond = line.split(';', 1) + package = package.strip() + cond = cond.strip() + cond = cond.replace('sys_platform', repr(sys_platform)) + cond = cond.replace('python_version', repr(python_version)) + if not eval(cond): + continue else: - path = _download('https://bootstrap.pypa.io/get-pip.py', tmpdir) - - print("Running get-pip.py") - _get_pip_main(tmpdir) - - import pip - - def _pip(args): - base_args = ['--disable-pip-version-check'] - if certs_path: - base_args += ['--cert', certs_path] - if sys.platform == 'darwin' and sys.version_info[0:2] in [(2, 6), (2, 7)]: - new_args = [] - for arg in args: - new_args.append(arg) - if arg == 'install': - new_args.append('--user') - args = new_args - pip.main(base_args + args) - - return _pip + package = line.strip() + + if re.match(r'^\s*-r\s*', package): + sub_req_file = re.sub(r'^\s*-r\s*', '', package) + sub_req_file = os.path.abspath(os.path.join(os.path.dirname(path), sub_req_file)) + packages.extend(_parse_requires(sub_req_file)) + continue + + if re.match(r'https?://', package): + packages.append({'type': 'url', 'pkg': package}) + continue + + if '>=' in package: + parts = package.split('>=') + package = parts[0].strip() + ver = parts[1].strip() + packages.append({'type': '>=', 'pkg': package, 'ver': ver}) + continue + + if '==' in package: + parts = package.split('==') + package = parts[0].strip() + ver = parts[1].strip() + packages.append({'type': '==', 'pkg': package, 'ver': ver}) + continue + + if re.search(r'[^ a-zA-Z0-9\-]', package): + raise Exception('Unsupported requirements format version constraint: %s' % package) + + packages.append({'type': 'any', 'pkg': package}) + + return packages + + +def _execute(params, cwd, retry=None): + """ + Executes a subprocess + + :param params: + A list of the executable and arguments to pass to it + + :param cwd: + The working directory to execute the command in + + :param retry: + If this string is present in stderr, retry the operation + + :return: + A 2-element tuple of (stdout, stderr) + """ + + proc = subprocess.Popen( + params, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=cwd + ) + stdout, stderr = proc.communicate() + code = proc.wait() + if code != 0: + if retry and retry in stderr.decode('utf-8'): + return _execute(params, cwd) + e = OSError('subprocess exit code for "%s" was %d: %s' % (' '.join(params), code, stderr)) + e.stdout = stdout + e.stderr = stderr + raise e + return (stdout, stderr) diff --git a/dev/lint.py b/dev/lint.py index 39513b3..cbfa9a7 100644 --- a/dev/lint.py +++ b/dev/lint.py @@ -3,6 +3,8 @@ from __future__ import unicode_literals, division, absolute_import, print_functi import os +from . import package_name, package_root + import flake8 if not hasattr(flake8, '__version_info__') or flake8.__version_info__ < (3,): from flake8.engine import get_style_guide @@ -10,10 +12,6 @@ else: from flake8.api.legacy import get_style_guide -cur_dir = os.path.dirname(__file__) -config_file = os.path.join(cur_dir, '..', 'tox.ini') - - def run(): """ Runs flake8 lint @@ -22,16 +20,17 @@ def run(): A bool - if flake8 did not find any errors """ - print('Running flake8') + print('Running flake8 %s' % flake8.__version__) - flake8_style = get_style_guide(config_file=config_file) + flake8_style = get_style_guide(config_file=os.path.join(package_root, 'tox.ini')) paths = [] - for root, _, filenames in os.walk('asn1crypto'): - for filename in filenames: - if not filename.endswith('.py'): - continue - paths.append(os.path.join(root, filename)) + for _dir in [package_name, 'dev', 'tests']: + for root, _, filenames in os.walk(_dir): + for filename in filenames: + if not filename.endswith('.py'): + continue + paths.append(os.path.join(root, filename)) report = flake8_style.check_files(paths) success = report.total_errors == 0 if success: diff --git a/dev/release.py b/dev/release.py index 316d75c..a854196 100644 --- a/dev/release.py +++ b/dev/release.py @@ -1,16 +1,13 @@ # coding: utf-8 from __future__ import unicode_literals, division, absolute_import, print_function -import os import subprocess import sys -import setuptools.sandbox import twine.cli - -base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) -setup_file = os.path.join(base_dir, 'setup.py') +from . import package_name, package_root, has_tests_package +from .build import run as build def run(): @@ -26,7 +23,7 @@ def run(): ['git', 'status', '--porcelain', '-uno'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - cwd=base_dir + cwd=package_root ) git_wc_status, _ = git_wc_proc.communicate() @@ -39,7 +36,7 @@ def run(): ['git', 'tag', '-l', '--contains', 'HEAD'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, - cwd=base_dir + cwd=package_root ) tag, tag_error = git_tag_proc.communicate() @@ -54,14 +51,10 @@ def run(): tag = tag.decode('ascii').strip() - setuptools.sandbox.run_setup( - setup_file, - ['sdist', 'bdist_wheel', '--universal'] - ) + build() - twine.cli.dispatch(['upload', 'dist/asn1crypto-%s*' % tag]) + twine.cli.dispatch(['upload', 'dist/%s-%s*' % (package_name, tag)]) + if has_tests_package: + twine.cli.dispatch(['upload', 'dist/%s_tests-%s*' % (package_name, tag)]) - setuptools.sandbox.run_setup( - setup_file, - ['clean'] - ) + return True diff --git a/dev/tests.py b/dev/tests.py index 071ee23..a065c38 100644 --- a/dev/tests.py +++ b/dev/tests.py @@ -5,10 +5,19 @@ import unittest import re import sys +from . import requires_oscrypto +from ._import import _preload + from tests import test_classes +if sys.version_info < (3,): + range = xrange # noqa + from cStringIO import StringIO +else: + from io import StringIO + -def run(matcher=None): +def run(matcher=None, repeat=1, ci=False): """ Runs the tests @@ -16,20 +25,51 @@ def run(matcher=None): A unicode string containing a regular expression to use to filter test names by. A value of None will cause no filtering. + :param repeat: + An integer - the number of times to run the tests + + :param ci: + A bool, indicating if the tests are being run as part of CI + :return: A bool - if the tests succeeded """ - suite = unittest.TestSuite() + _preload(requires_oscrypto, not ci) + loader = unittest.TestLoader() + # We have to manually track the list of applicable tests because for + # some reason with Python 3.4 on Windows, the tests in a suite are replaced + # with None after being executed. This breaks the repeat functionality. + test_list = [] for test_class in test_classes(): if matcher: names = loader.getTestCaseNames(test_class) for name in names: if re.search(matcher, name): - suite.addTest(test_class(name)) + test_list.append(test_class(name)) else: - suite.addTest(loader.loadTestsFromTestCase(test_class)) - verbosity = 2 if matcher else 1 - result = unittest.TextTestRunner(stream=sys.stdout, verbosity=verbosity).run(suite) - return result.wasSuccessful() + test_list.append(loader.loadTestsFromTestCase(test_class)) + + stream = sys.stdout + verbosity = 1 + if matcher and repeat == 1: + verbosity = 2 + elif repeat > 1: + stream = StringIO() + + for _ in range(0, repeat): + suite = unittest.TestSuite() + for test in test_list: + suite.addTest(test) + result = unittest.TextTestRunner(stream=stream, verbosity=verbosity).run(suite) + + if len(result.errors) > 0 or len(result.failures) > 0: + if repeat > 1: + print(stream.getvalue()) + return False + + if repeat > 1: + stream.truncate(0) + + return True diff --git a/dev/version.py b/dev/version.py new file mode 100644 index 0000000..3027431 --- /dev/null +++ b/dev/version.py @@ -0,0 +1,80 @@ +# coding: utf-8 +from __future__ import unicode_literals, division, absolute_import, print_function + +import codecs +import os +import re + +from . import package_root, package_name, has_tests_package + + +def run(new_version): + """ + Updates the package version in the various locations + + :param new_version: + A unicode string of the new library version as a PEP 440 version + + :return: + A bool - if the version number was successfully bumped + """ + + # We use a restricted form of PEP 440 versions + version_match = re.match( + r'(\d+)\.(\d+)\.(\d)+(?:\.((?:dev|a|b|rc)\d+))?$', + new_version + ) + if not version_match: + raise ValueError('Invalid PEP 440 version: %s' % new_version) + + new_version_info = ( + int(version_match.group(1)), + int(version_match.group(2)), + int(version_match.group(3)), + ) + if version_match.group(4): + new_version_info += (version_match.group(4),) + + version_path = os.path.join(package_root, package_name, 'version.py') + setup_path = os.path.join(package_root, 'setup.py') + setup_tests_path = os.path.join(package_root, 'tests', 'setup.py') + tests_path = os.path.join(package_root, 'tests', '__init__.py') + + file_paths = [version_path, setup_path] + if has_tests_package: + file_paths.extend([setup_tests_path, tests_path]) + + for file_path in file_paths: + orig_source = '' + with codecs.open(file_path, 'r', encoding='utf-8') as f: + orig_source = f.read() + + found = 0 + new_source = '' + for line in orig_source.splitlines(True): + if line.startswith('__version__ = '): + found += 1 + new_source += '__version__ = %r\n' % new_version + elif line.startswith('__version_info__ = '): + found += 1 + new_source += '__version_info__ = %r\n' % (new_version_info,) + elif line.startswith('PACKAGE_VERSION = '): + found += 1 + new_source += 'PACKAGE_VERSION = %r\n' % new_version + else: + new_source += line + + if found == 0: + raise ValueError('Did not find any versions in %s' % file_path) + + s = 's' if found > 1 else '' + rel_path = file_path[len(package_root) + 1:] + was_were = 'was' if found == 1 else 'were' + if new_source != orig_source: + print('Updated %d version%s in %s' % (found, s, rel_path)) + with codecs.open(file_path, 'w', encoding='utf-8') as f: + f.write(new_source) + else: + print('%d version%s in %s %s up-to-date' % (found, s, rel_path, was_were)) + + return True @@ -13,12 +13,14 @@ A fast, pure Python library for parsing and serializing ASN.1 structures. - [Continuous Integration](#continuous-integration) - [Testing](#testing) - [Development](#development) + - [CI Tasks](#ci-tasks) +[![GitHub Actions CI](https://github.com/wbond/asn1crypto/workflows/CI/badge.svg)](https://github.com/wbond/asn1crypto/actions?workflow=CI) [![Travis CI](https://api.travis-ci.org/wbond/asn1crypto.svg?branch=master)](https://travis-ci.org/wbond/asn1crypto) [![AppVeyor](https://ci.appveyor.com/api/projects/status/github/wbond/asn1crypto?branch=master&svg=true)](https://ci.appveyor.com/project/wbond/asn1crypto) [![CircleCI](https://circleci.com/gh/wbond/asn1crypto.svg?style=shield)](https://circleci.com/gh/wbond/asn1crypto) [![Codecov](https://codecov.io/gh/wbond/asn1crypto/branch/master/graph/badge.svg)](https://codecov.io/gh/wbond/asn1crypto) -[![PyPI](https://img.shields.io/pypi/v/asn1crypto.svg)](https://pypi.python.org/pypi/asn1crypto) +[![PyPI](https://img.shields.io/pypi/v/asn1crypto.svg)](https://pypi.org/project/asn1crypto/) ## Features @@ -44,8 +46,8 @@ a bunch of ASN.1 structures for use with various common cryptography standards: ## Why Another Python ASN.1 Library? -Python has long had the [pyasn1](https://pypi.python.org/pypi/pyasn1) and -[pyasn1_modules](https://pypi.python.org/pypi/pyasn1-modules) available for +Python has long had the [pyasn1](https://pypi.org/project/pyasn1/) and +[pyasn1_modules](https://pypi.org/project/pyasn1-modules/) available for parsing and serializing ASN.1 structures. While the project does include a comprehensive set of tools for parsing and serializing, the performance of the library can be very poor, especially when dealing with bit fields and parsing @@ -94,7 +96,7 @@ under 8 seconds. With *pyasn1*, using definitions from *pyasn1-modules*, the same parsing took over 4,100 seconds. For smaller structures the performance difference can range from a few times -faster to an order of magnitude of more. +faster to an order of magnitude or more. ## Related Crypto Libraries @@ -110,12 +112,12 @@ faster to an order of magnitude of more. ## Current Release -0.24.0 - [changelog](changelog.md) +1.0.0 - [changelog](changelog.md) ## Dependencies -Python 2.6, 2.7, 3.2, 3.3, 3.4, 3.5, 3.6 or pypy. *No third-party packages -required.* +Python 2.6, 2.7, 3.2, 3.3, 3.4, 3.5, 3.6, 3.7, 3.8 or pypy. *No third-party +packages required.* ## Installation @@ -161,7 +163,15 @@ links to the source for the various pre-defined type classes. ## Testing -Tests are written using `unittest` and require no third-party packages: +Tests are written using `unittest` and require no third-party packages. + +Depending on what type of source is available for the package, the following +commands can be used to run the test suite. + +### Git Repository + +When working within a Git working copy, or an archive of the Git repository, +the full test suite is run via: ```bash python run.py tests @@ -173,6 +183,25 @@ To run only some tests, pass a regular expression as a parameter to `tests`. python run.py tests ocsp ``` +### PyPi Source Distribution + +When working within an extracted source distribution (aka `.tar.gz`) from +PyPi, the full test suite is run via: + +```bash +python setup.py test +``` + +### Package + +When the package has been installed via pip (or another method), the package +`asn1crypto_tests` may be installed and invoked to run the full test suite: + +```bash +pip install asn1crypto_tests +python -m asn1crypto_tests +``` + ## Development To install the package used for linting, execute: @@ -199,6 +228,12 @@ Coverage is measured by running: python run.py coverage ``` +To change the version number of the package, run: + +```bash +python run.py version {pep440_version} +``` + To install the necessary packages for releasing a new version on PyPI, run: ```bash @@ -207,19 +242,22 @@ pip install --user -r requires/release Releases are created by: - - Making a git tag in [semver](http://semver.org/) format + - Making a git tag in [PEP 440](https://www.python.org/dev/peps/pep-0440/#examples-of-compliant-version-schemes) format - Running the command: ```bash python run.py release ``` -Existing releases can be found at https://pypi.python.org/pypi/asn1crypto. +Existing releases can be found at https://pypi.org/project/asn1crypto/. ## CI Tasks -A task named `deps` exists to ensure a modern version of `pip` is installed, -along with all necessary testing dependencies. +A task named `deps` exists to download and stage all necessary testing +dependencies. On posix platforms, `curl` is used for downloads and on Windows +PowerShell with `Net.WebClient` is used. This configuration sidesteps issues +related to getting pip to work properly and messing with `site-packages` for +the version of Python being used. The `ci` task runs `lint` (if flake8 is available for the version of Python) and `coverage` (or `tests` if coverage is not available for the version of Python). diff --git a/requires/lint b/requires/lint index 9c49d4e..f5d0e74 100644 --- a/requires/lint +++ b/requires/lint @@ -1 +1,7 @@ -flake8 ; python_version == '2.7' or python_version >= '3.3' +setuptools == 39.0.1 ; python_version == '2.7' or python_version >= '3.3' +enum34 == 1.1.6 ; python_version == '2.7' or python_version == '3.3' +mccabe == 0.6.1 ; python_version == '2.7' or python_version >= '3.3' +pycodestyle == 2.3.1 ; python_version == '2.7' or python_version >= '3.3' +pyflakes == 1.6.0 ; python_version == '2.7' or python_version >= '3.3' +configparser == 3.5.0 ; python_version == '2.7' +flake8 == 3.5.0 ; python_version == '2.7' or python_version >= '3.3'
\ No newline at end of file diff --git a/requires/release b/requires/release index af996cf..91cff65 100644 --- a/requires/release +++ b/requires/release @@ -1 +1,3 @@ -twine +wheel>=0.31.0 +twine>=1.11.0 +setuptools>=38.6.0 @@ -11,7 +11,7 @@ else: def show_usage(): - print('Usage: run.py (lint | tests [regex] | coverage | deps | ci | release)', file=sys.stderr) + print('Usage: run.py (lint | tests [regex] | coverage | deps | ci | version {pep440_version} | build | release)', file=sys.stderr) sys.exit(1) @@ -29,10 +29,10 @@ if len(sys.argv) < 2 or len(sys.argv) > 3: task, next_arg = get_arg(1) -if task not in set(['lint', 'tests', 'coverage', 'deps', 'ci', 'release']): +if task not in set(['lint', 'tests', 'coverage', 'deps', 'ci', 'version', 'build', 'release']): show_usage() -if task != 'tests' and len(sys.argv) == 3: +if task != 'tests' and task != 'version' and len(sys.argv) == 3: show_usage() params = [] @@ -54,6 +54,16 @@ elif task == 'deps': elif task == 'ci': from dev.ci import run +elif task == 'version': + from dev.version import run + if len(sys.argv) != 3: + show_usage() + pep440_version, next_arg = get_arg(next_arg) + params.append(pep440_version) + +elif task == 'build': + from dev.build import run + elif task == 'release': from dev.release import run @@ -1,9 +1,69 @@ +import codecs import os import shutil +import sys +import warnings + +import setuptools +from setuptools import setup, Command +from setuptools.command.egg_info import egg_info + + +PACKAGE_NAME = 'asn1crypto' +PACKAGE_VERSION = '1.0.0' +PACKAGE_ROOT = os.path.dirname(os.path.abspath(__file__)) + + +# setuptools 38.6.0 and newer know about long_description_content_type, but +# distutils still complains about it, so silence the warning +sv = setuptools.__version__ +svi = tuple(int(o) if o.isdigit() else o for o in sv.split('.')) +if svi >= (38, 6): + warnings.filterwarnings( + 'ignore', + "Unknown distribution option: 'long_description_content_type'", + module='distutils.dist' + ) + + +# Try to load the tests first from the source repository layout. If that +# doesn't work, we assume this file is in the release package, and the tests +# are part of the package {PACKAGE_NAME}_tests. +if os.path.exists(os.path.join(PACKAGE_ROOT, 'tests')): + tests_require = [] + test_suite = 'tests.make_suite' +else: + tests_require = ['%s_tests' % PACKAGE_NAME] + test_suite = '%s_tests.make_suite' % PACKAGE_NAME + + +# This allows us to send the LICENSE and docs when creating a sdist. Wheels +# automatically include the LICENSE, and don't need the docs. For these +# to be included, the command must be "python setup.py sdist". +package_data = {} +if sys.argv[1:] == ['sdist'] or sorted(sys.argv[1:]) == ['-q', 'sdist']: + package_data[PACKAGE_NAME] = [ + '../LICENSE', + '../*.md', + '../docs/*.md', + ] -from setuptools import setup, find_packages, Command -from asn1crypto import version +# Ensures a copy of the LICENSE is included with the egg-info for +# install and bdist_egg commands +class EggInfoCommand(egg_info): + def run(self): + egg_info_path = os.path.join( + PACKAGE_ROOT, + '%s.egg-info' % PACKAGE_NAME + ) + if not os.path.exists(egg_info_path): + os.mkdir(egg_info_path) + shutil.copy2( + os.path.join(PACKAGE_ROOT, 'LICENSE'), + os.path.join(egg_info_path, 'LICENSE') + ) + egg_info.run(self) class CleanCommand(Command): @@ -18,30 +78,38 @@ class CleanCommand(Command): pass def run(self): - folder = os.path.dirname(os.path.abspath(__file__)) - for sub_folder in ['build', 'dist', 'asn1crypto.egg-info']: - full_path = os.path.join(folder, sub_folder) + sub_folders = ['build', 'temp', '%s.egg-info' % PACKAGE_NAME] + if self.all: + sub_folders.append('dist') + for sub_folder in sub_folders: + full_path = os.path.join(PACKAGE_ROOT, sub_folder) if os.path.exists(full_path): shutil.rmtree(full_path) - for root, dirnames, filenames in os.walk(os.path.join(folder, 'asn1crypto')): - for filename in filenames: + for root, dirs, files in os.walk(os.path.join(PACKAGE_ROOT, PACKAGE_NAME)): + for filename in files: if filename[-4:] == '.pyc': os.unlink(os.path.join(root, filename)) - for dirname in list(dirnames): + for dirname in list(dirs): if dirname == '__pycache__': shutil.rmtree(os.path.join(root, dirname)) +readme = '' +with codecs.open(os.path.join(PACKAGE_ROOT, 'readme.md'), 'r', 'utf-8') as f: + readme = f.read() + + setup( - name='asn1crypto', - version=version.__version__, + name=PACKAGE_NAME, + version=PACKAGE_VERSION, description=( 'Fast ASN.1 parser and serializer with definitions for private keys, ' 'public keys, certificates, CRL, OCSP, CMS, PKCS#3, PKCS#7, PKCS#8, ' 'PKCS#12, PKCS#5, X.509 and TSP' ), - long_description='Docs for this project are maintained at https://github.com/wbond/asn1crypto#readme.', + long_description=readme, + long_description_content_type='text/markdown', url='https://github.com/wbond/asn1crypto', @@ -51,19 +119,25 @@ setup( license='MIT', classifiers=[ - 'Development Status :: 4 - Beta', + 'Development Status :: 5 - Production/Stable', 'Intended Audience :: Developers', 'License :: OSI Approved :: MIT License', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2', 'Programming Language :: Python :: 2.6', 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.2', 'Programming Language :: Python :: 3.3', 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: Implementation :: CPython', 'Programming Language :: Python :: Implementation :: PyPy', 'Topic :: Security :: Cryptography', @@ -71,11 +145,14 @@ setup( keywords='asn1 crypto pki x509 certificate rsa dsa ec dh', - packages=find_packages(exclude=['tests*', 'dev*']), + packages=[PACKAGE_NAME], + package_data=package_data, - test_suite='tests.make_suite', + tests_require=tests_require, + test_suite=test_suite, cmdclass={ 'clean': CleanCommand, + 'egg_info': EggInfoCommand, } ) diff --git a/tests/LICENSE b/tests/LICENSE new file mode 100644 index 0000000..8038d9a --- /dev/null +++ b/tests/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2015-2019 Will Bond <will@wbond.net> + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/tests/__init__.py b/tests/__init__.py index 783a20f..d267878 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -6,6 +6,44 @@ import os import unittest +__version__ = '1.0.0' +__version_info__ = (1, 0, 0) + + +def _import_from(mod, path, mod_dir=None): + """ + Imports a module from a specific path + + :param mod: + A unicode string of the module name + + :param path: + A unicode string to the directory containing the module + + :param mod_dir: + If the sub directory of "path" is different than the "mod" name, + pass the sub directory as a unicode string + + :return: + None if not loaded, otherwise the module + """ + + if mod_dir is None: + mod_dir = mod + + if not os.path.exists(path): + return None + + if not os.path.exists(os.path.join(path, mod_dir)): + return None + + try: + mod_info = imp.find_module(mod_dir, [path]) + return imp.load_module(mod, *mod_info) + except ImportError: + return None + + def make_suite(): """ Constructs a unittest.TestSuite() of all tests for the package. For use @@ -31,11 +69,24 @@ def test_classes(): A list of unittest.TestCase classes """ - # Make sure the module is loaded from this source folder - module_name = 'asn1crypto' - src_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..') - module_info = imp.find_module(module_name, [src_dir]) - imp.load_module(module_name, *module_info) + # If we are in a source folder and these tests aren't installed as a + # package, we want to load asn1crypto from this source folder + tests_dir = os.path.dirname(os.path.abspath(__file__)) + + asn1crypto = None + if os.path.basename(tests_dir) == 'tests': + asn1crypto = _import_from( + 'asn1crypto', + os.path.join(tests_dir, '..') + ) + if asn1crypto is None: + import asn1crypto + + if asn1crypto.__version__ != __version__: + raise AssertionError( + ('asn1crypto_tests version %s can not be run with ' % __version__) + + ('asn1crypto version %s' % asn1crypto.__version__) + ) from .test_algos import AlgoTests from .test_cms import CMSTests diff --git a/tests/__main__.py b/tests/__main__.py new file mode 100644 index 0000000..644391e --- /dev/null +++ b/tests/__main__.py @@ -0,0 +1,14 @@ +# coding: utf-8 +from __future__ import unicode_literals, division, absolute_import, print_function + +import sys +import unittest + +from . import test_classes + + +suite = unittest.TestSuite() +loader = unittest.TestLoader() +for test_class in test_classes(): + suite.addTest(loader.loadTestsFromTestCase(test_class)) +unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite) diff --git a/tests/fixtures/aesccm_algo.der b/tests/fixtures/aesccm_algo.der Binary files differnew file mode 100644 index 0000000..74c7d7f --- /dev/null +++ b/tests/fixtures/aesccm_algo.der diff --git a/tests/fixtures/invalid_email_tag.pem b/tests/fixtures/invalid_email_tag.pem new file mode 100644 index 0000000..26a9e7e --- /dev/null +++ b/tests/fixtures/invalid_email_tag.pem @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDvjCCAqigAwIBAgIEVXFPzTALBgkqhkiG9w0BAQUwgaExCzAJBgNVBAYMAkRFMRMwEQYDVQQIDAp +UaHVlcmluZ2VuMQ8wDQYDVQQHDAZFcmZ1cnQxEjAQBgNVBAoMCUtleXdlYiBBRzEWMBQGA1UECwwNS2 +V5SGVscCBQYW5lbDEhMB8GA1UEAwwYa20zNDkyNy0wMS5rZXltYWNoaW5lLmRlMR0wGwYJKoZIhvcNA +QkBDA5pbmZvQGtleXdlYi5kZTAiGA8yMDE1MDYwNTA3MjkxN1oYDzIwMjUwNjA1MDcyOTE3WjCBoTEL +MAkGA1UEBgwCREUxEzARBgNVBAgMClRodWVyaW5nZW4xDzANBgNVBAcMBkVyZnVydDESMBAGA1UECgw +JS2V5d2ViIEFHMRYwFAYDVQQLDA1LZXlIZWxwIFBhbmVsMSEwHwYDVQQDDBhrbTM0OTI3LTAxLmtleW +1hY2hpbmUuZGUxHTAbBgkqhkiG9w0BCQEMDmluZm9Aa2V5d2ViLmRlMIIBIDALBgkqhkiG9w0BAQEDg +gEPADCCAQoCggEBAK4HS2XPemRfn+wARSP885DfHn05+JDQe6KChNwMlKWwGMnwIwJh1ysyvgcNAYyB +1uAB3DbAS0qkeVCGr7spQApD8DIk5GpuNmsrWm7s4/auwDiaUxxpj2tDGBGSNeaN36c+f1vKpED3SQd +vTPLeKX0X3mx/yQbylQsfKrydgUr4nlfXa+QZHLHS+FvCDV+h1FzDb5Wdu4lLF0mdudo7rOBQsTVacc +EfvCT79r1NocWL0/J6wpa4/Qs0cbxkTHCwgr9X7YpMBWtTmQl1wr/eN+yf9bx6xzCmRGJMD9Sjp2MUU +TEfrQcLkB6aGWGgFYzHqUggX3kJPMHxw9pQq6Nz7ecCAwEAATALBgkqhkiG9w0BAQUDggEBAGE05k2e +j7uKWTfkc6I0snDD/nqYlECCpkgAtZO0HKasihrICHewlyZt7hTMQVo77gZ3oeH5X1uJGGV5ITrrWDV +HciYDGAeYSKWjYnZ2KtaC4UQN5+UBUkd59YDF+SYN34pqE+5N+UmfQDPOJX7C9v+sWiK9HouVKjNRNH +At0Ncz/Huwh5GOxMJfPaMFD5yJC4HYtj0uh5nTMppm+N/EKU3hXGtERXcRbRep8ovcwz1PpOkp+pnW9 +FJFjFfKRYdctzZ57g+GpQef2REUjasYJDXXObu9GDeanluZqINhd/7Qsoeygk5bcbsjOdpNJE/+aGCl +tJBWsWFfkSV1KM4Rg1E= +-----END CERTIFICATE----- diff --git a/tests/fixtures/keys/test-public-rsapss-der.key b/tests/fixtures/keys/test-public-rsapss-der.key Binary files differnew file mode 100644 index 0000000..c7937be --- /dev/null +++ b/tests/fixtures/keys/test-public-rsapss-der.key diff --git a/tests/fixtures/keys/test-public-rsapss.key b/tests/fixtures/keys/test-public-rsapss.key new file mode 100644 index 0000000..8e9a068 --- /dev/null +++ b/tests/fixtures/keys/test-public-rsapss.key @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIDALBgkqhkiG9w0BAQoDggEPADCCAQoCggEBAKEVLWOTtyNDiAZXb8NqFwmB +PWVmm9mDMTNaptwozSd6OQXPJcXzu8zpwCwWdk3QehrWca3nN0qXN6Hq+pEVjD6/ +QQRMxBQwJeHfTuh8Ci4nz8Xw7gMxR2k36kK01pN/6pdW2S4c4a1Zut7g9zbYIH9U +U1jHMPcqP3I6zBrW5WO5n4XoH5ME+xpIlMJLWCd4X8/xSY2IhY0/ssYCnPLvMqGj +Opw6nwurdPH9zwQvzE8K++8OtfmTQDBUyf4w861qiYjCCdBnj7sjCnTWFggKg9Tt +ot5xPF7bStKEyC3N5HXz3Y8H5jBY8rIYqs2WE+wIBM4s7LxqJ5pyxCmE82dJ+fUC +AwEAAQ== +-----END PUBLIC KEY----- diff --git a/tests/fixtures/keys/test-rsapss-der.key b/tests/fixtures/keys/test-rsapss-der.key Binary files differnew file mode 100644 index 0000000..4449d65 --- /dev/null +++ b/tests/fixtures/keys/test-rsapss-der.key diff --git a/tests/fixtures/keys/test-rsapss.crt b/tests/fixtures/keys/test-rsapss.crt new file mode 100644 index 0000000..4fc9b3f --- /dev/null +++ b/tests/fixtures/keys/test-rsapss.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDWzCCAhKgAwIBAgIURizKkLO5lJkypU9NL3yhfR8mUY0wPgYJKoZIhvcNAQEK +MDGgDTALBglghkgBZQMEAgGhGjAYBgkqhkiG9w0BAQgwCwYJYIZIAWUDBAIBogQC +AgDeMA0xCzAJBgNVBAMMAkNBMB4XDTE5MDkyMTEwMjcyNFoXDTI5MDkxODEwMjcy +NFowDTELMAkGA1UEAwwCQ0EwggEgMAsGCSqGSIb3DQEBCgOCAQ8AMIIBCgKCAQEA +oRUtY5O3I0OIBldvw2oXCYE9ZWab2YMxM1qm3CjNJ3o5Bc8lxfO7zOnALBZ2TdB6 +GtZxrec3Spc3oer6kRWMPr9BBEzEFDAl4d9O6HwKLifPxfDuAzFHaTfqQrTWk3/q +l1bZLhzhrVm63uD3Ntggf1RTWMcw9yo/cjrMGtblY7mfhegfkwT7GkiUwktYJ3hf +z/FJjYiFjT+yxgKc8u8yoaM6nDqfC6t08f3PBC/MTwr77w61+ZNAMFTJ/jDzrWqJ +iMIJ0GePuyMKdNYWCAqD1O2i3nE8XttK0oTILc3kdfPdjwfmMFjyshiqzZYT7AgE +zizsvGonmnLEKYTzZ0n59QIDAQABo1MwUTAdBgNVHQ4EFgQUR55Wi9L1KUAfez/M +O/5Z+sDOmn8wHwYDVR0jBBgwFoAUR55Wi9L1KUAfez/MO/5Z+sDOmn8wDwYDVR0T +AQH/BAUwAwEB/zA+BgkqhkiG9w0BAQowMaANMAsGCWCGSAFlAwQCAaEaMBgGCSqG +SIb3DQEBCDALBglghkgBZQMEAgGiBAICAN4DggEBABz2Sub4E5RWr0VVSu/l1gLR +/XmT13AJDqjJ6dyfjMWV8bxVHZAXXBhJk7OMxTkEpHINbcoBEsQdtbQ2lkX7S5fI +7Oyz+Du1ux5uCVRHVjeqEjVkmxuODxPVu4y57Ix6UDL2zDoqCeQcT3V4kw3SqyJn +znv/1OaQ5+20QbHqWEQtjUYv2VyDBE3QqXylKWy1V5YxJJ8g3yBHQxN/+c7o8mti +leTw6Nw2hyunVUmIE07uUwgbwrhck5DQGWqpmsI9D2HugJH0whlCvHjpQUVPEAkL +aNYaelnJ56t6tnIXZEVrtPh7oOyEZWnnj6q3moR/annXkdox5NbQlGLRQDR3+EA= +-----END CERTIFICATE----- diff --git a/tests/fixtures/keys/test-rsapss.key b/tests/fixtures/keys/test-rsapss.key new file mode 100644 index 0000000..2ae4c5b --- /dev/null +++ b/tests/fixtures/keys/test-rsapss.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvAIBADALBgkqhkiG9w0BAQoEggSoMIIEpAIBAAKCAQEAoRUtY5O3I0OIBldv +w2oXCYE9ZWab2YMxM1qm3CjNJ3o5Bc8lxfO7zOnALBZ2TdB6GtZxrec3Spc3oer6 +kRWMPr9BBEzEFDAl4d9O6HwKLifPxfDuAzFHaTfqQrTWk3/ql1bZLhzhrVm63uD3 +Ntggf1RTWMcw9yo/cjrMGtblY7mfhegfkwT7GkiUwktYJ3hfz/FJjYiFjT+yxgKc +8u8yoaM6nDqfC6t08f3PBC/MTwr77w61+ZNAMFTJ/jDzrWqJiMIJ0GePuyMKdNYW +CAqD1O2i3nE8XttK0oTILc3kdfPdjwfmMFjyshiqzZYT7AgEzizsvGonmnLEKYTz +Z0n59QIDAQABAoIBAQCc76769u1UM/UQiJtgvbmYDwwsAJ4Sepiyub0bfbzym0d2 ++2yHwYDUkWAjE/dKtLRh9U9n6H6b81vGKtLYCzBJ6beEYu4d5RLjTtbn9gFNGoh5 +BtQ81AQI5Osc9maf6d46d+i73nOYmnVPs8nm6wYuR4+0TMzN4aFSvyofdAKk9qZP +FWY1Vexi4diiChE+HytJ1jtQZIVmTd55oK5HG7tD1seYR7J2F91+KNg3CVC1/y3/ +JhoTDtDeeWtwTnOKafdOqmI4xQu0mZgo0nt/w+PoFGo7pmUv7RWY70qHBO63txCs +c7pX+tn9PERbCcOncAg5yNdC31TKMCSZT6vKz945AoGBAM6TThUqoB5UlrgfPL+1 +6xqMomU4L5OA1M1N3PctiLfe9CmMO/8gh8j0uWsCJp9I6nlg4tIJAYyvd+sa/UzF +Vmf5cyyDEEMuxFYo7UAz7AbcCho4QxSDYvrmp3muFKYRtF/tD69TWrGfq+/QCKXX +7CUElZDjlScqREhQioFJ/Xe7AoGBAMefdL103NlF15mqGD62ZkkZkoB08TDvxPPK +/A3voqCDwpxYPSDSS7o04EphX4gCB6K90ZgHd53Ihox2osDDgcPht+xTICC4ETEL +EST7KkDIhKVpjDRr5Tej7q1wsMbklvVMkVywZ2WQrfsis/tI4b9N10fI/hdj0iTr +AvD2OYIPAoGBAKhc+pjZwuK1gpnSK8r8U/+xe3IP1wbbS5WAzoVOwU1LE6kBOKz5 +MHIiszR57kyIO0JuKq+Q04h8QrqFpsj5VTEs6CfxMkHvTeoDNUrMhqQYlstD67g+ +VV+0ue68aOvpJh/AsLXus85tGs87uLCiST7qe5Q5SIlBM6HUsu4pBcKdAoGAUP+C +ft1MP1z3foJmFAwutLqLl1PcCd9AKyvR2lXBxx+vd4DWTNsHnyaVW5jnCmjIcGBV +Czr8bilPbu80WsL5hGGyH1IbVytYzm2PJ1JCcsbqC7QoD504BLufvQBculdGaYIH ++XQagDuUXLJYFT4dW2JaV+ZWM2dtfU1ehCdkbkECgYAhcAkEP8F5W7uW9hgx/gAg +9gKA/YleJv0gwP+wxKMBkX1OlRuZViN04LcdiVh1uyqtnSa1EwNZn69OHq0bLTXa +F986zj/zUuj4yFNKxnSq3FDISglMo5Ua6HDHT57Dn5vbUnln+Tiq0En5k/8neOJT +TEIWZO5wFQeL1N0l4Nai0g== +-----END PRIVATE KEY----- diff --git a/tests/fixtures/keys/test-validity.crt b/tests/fixtures/keys/test-validity.crt new file mode 100644 index 0000000..a687ecf --- /dev/null +++ b/tests/fixtures/keys/test-validity.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDYjCCAkqgAwIBAgIJAOChyeZ0Qi1CMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwIBcNMTgwMjIxMTIyNzM5WhgPMjExODAxMjgxMjI3Mzla +MEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJ +bnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw +ggEKAoIBAQCTsr+GpZTdUCIvkIKVfhfV8GGMJaB1ctRaMXT9CqoZVfmogXemavbN +KV8ezn3/Hul7btilksz/9rB8QPgWmBf4n0vUXhQyGWKUnfQr63knhuN13KyspD4o +St8gFzHsZhHlIENLl9fafqSAKW8oqZA6ZeNTR4ZA7ye2IsGvBzwZwa4bxZSVcQ7o +hacNCP9/gNI/UuVIOHhmpBFuIJ9qWYH8NTQrjY0DCgrfhMZjySWqm3BsDjsm2UPJ +z83QzAcSMKGztWbJVzgY5X3Zykk5qfXMYeFJ8ro38Vah8KXEVto9cZinlNSpXmKE +cKTq8hmXf97KgYyEayLOMHA46Kk6kpG7AgMBAAGjUzBRMB0GA1UdDgQWBBTbexgn +2KU8SP3YucrZS6Z/XOyLFzAfBgNVHSMEGDAWgBTbexgn2KU8SP3YucrZS6Z/XOyL +FzAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQCFjzXXqtdlUGtF +PKAMmBaQFgjKu98cTrXywcyVMBkw+dm532xuNR6Y2B95pewb1ctoAeLLDFJB5OxB +/yPzEt79y1KCzI2FtU7ecLK0W5AZC80kru1Gmkt5mygTuzZfjj84ayrOfxR2wvkB +rqItoYzc1+VeJWtO0RftoXtJRMBLYaW38NIMRsqdni6YCMa/xL38G7RMDJEgvOvs +16P57HGVvwRMPKRz26PKWdL7/Ewvjxn3P9NxQI/dkaFye4zwKaxMoDyRIrpt43pV +VSFm1ro2TVFVkE2TTV8SA8NWhZB6n2u6sc0zGwGwH71sx37AYTga+PDBG/+FRGc/ +JbQXTwyP +-----END CERTIFICATE----- diff --git a/tests/readme.md b/tests/readme.md new file mode 100644 index 0000000..930f7cb --- /dev/null +++ b/tests/readme.md @@ -0,0 +1,9 @@ +# asn1crypto_tests + +Run the test suite via: + +```bash +python -m asn1crypto_tests +``` + +Full documentation a <https://github.com/wbond/asn1crypto#readme>. diff --git a/tests/setup.py b/tests/setup.py new file mode 100644 index 0000000..9f69479 --- /dev/null +++ b/tests/setup.py @@ -0,0 +1,157 @@ +import codecs +import os +import shutil +import sys +import warnings + +import setuptools +from setuptools import setup, Command +from setuptools.command.egg_info import egg_info + + +PACKAGE_NAME = 'asn1crypto' +PACKAGE_VERSION = '1.0.0' +TEST_PACKAGE_NAME = '%s_tests' % PACKAGE_NAME +TESTS_ROOT = os.path.dirname(os.path.abspath(__file__)) +PACKAGE_ROOT = os.path.abspath(os.path.join(TESTS_ROOT, '..')) + + +# setuptools 38.6.0 and newer know about long_description_content_type, but +# distutils still complains about it, so silence the warning +sv = setuptools.__version__ +svi = tuple(int(o) if o.isdigit() else o for o in sv.split('.')) +if svi >= (38, 6): + warnings.filterwarnings( + 'ignore', + "Unknown distribution option: 'long_description_content_type'", + module='distutils.dist' + ) + + +# Older versions of distutils would take a glob pattern and return dirs +# and then would complain that it couldn't copy a dir like a file, so we +# have to build an explicit list of file names +data_files = [] +fixtures_dir = os.path.join(TESTS_ROOT, 'fixtures') +for root, dirs, files in os.walk(fixtures_dir): + for filename in files: + data_files.append(os.path.join(root, filename)[len(TESTS_ROOT) + 1:]) +package_data = { + TEST_PACKAGE_NAME: data_files +} +# This allows us to send the LICENSE when creating a sdist. Wheels +# automatically include the license, and don't need the docs. For these +# to be included, the command must be "python setup.py sdist". +if sys.argv[1:] == ['sdist'] or sorted(sys.argv[1:]) == ['-q', 'sdist']: + package_data[TEST_PACKAGE_NAME].extend([ + 'LICENSE', + 'readme.md', + ]) + + +# Ensures a copy of the LICENSE is included with the egg-info for +# install and bdist_egg commands +class EggInfoCommand(egg_info): + def run(self): + egg_info_path = os.path.join( + TESTS_ROOT, + '%s.egg-info' % TEST_PACKAGE_NAME + ) + if not os.path.exists(egg_info_path): + os.mkdir(egg_info_path) + shutil.copy2( + os.path.join(PACKAGE_ROOT, 'LICENSE'), + os.path.join(egg_info_path, 'LICENSE') + ) + egg_info.run(self) + + +class CleanCommand(Command): + user_options = [ + ('all', 'a', '(Compatibility with original clean command)'), + ] + + def initialize_options(self): + self.all = False + + def finalize_options(self): + pass + + def run(self): + sub_folders = ['build', 'temp', '%s.egg-info' % TEST_PACKAGE_NAME] + if self.all: + sub_folders.append('dist') + for sub_folder in sub_folders: + full_path = os.path.join(TESTS_ROOT, sub_folder) + if os.path.exists(full_path): + shutil.rmtree(full_path) + for root, dirs, files in os.walk(TESTS_ROOT): + for filename in files: + if filename[-4:] == '.pyc': + os.unlink(os.path.join(root, filename)) + for dirname in list(dirs): + if dirname == '__pycache__': + shutil.rmtree(os.path.join(root, dirname)) + + +readme = '' +with codecs.open(os.path.join(TESTS_ROOT, 'readme.md'), 'r', 'utf-8') as f: + readme = f.read() + + +setup( + name=TEST_PACKAGE_NAME, + version=PACKAGE_VERSION, + + description=( + 'Test suite for asn1crypto, separated due to file size' + ), + long_description=readme, + long_description_content_type='text/markdown', + + url='https://github.com/wbond/asn1crypto', + + author='wbond', + author_email='will@wbond.net', + + license='MIT', + + classifiers=[ + 'Development Status :: 5 - Production/Stable', + + 'Intended Audience :: Developers', + + 'License :: OSI Approved :: MIT License', + + 'Programming Language :: Python', + 'Programming Language :: Python :: 2', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.2', + 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: Implementation :: CPython', + 'Programming Language :: Python :: Implementation :: PyPy', + + 'Topic :: Security :: Cryptography', + ], + + keywords='asn1 crypto pki x509 certificate rsa dsa ec dh', + packages=[TEST_PACKAGE_NAME], + package_dir={TEST_PACKAGE_NAME: '.'}, + package_data=package_data, + + install_requires=[ + '%s==%s' % (PACKAGE_NAME, PACKAGE_VERSION), + ], + + cmdclass={ + 'clean': CleanCommand, + 'egg_info': EggInfoCommand, + } +) diff --git a/tests/test_algos.py b/tests/test_algos.py index a3550af..37b2d15 100644 --- a/tests/test_algos.py +++ b/tests/test_algos.py @@ -31,3 +31,14 @@ class AlgoTests(unittest.TestCase): def test_digest_parameters(self): sha1 = algos.DigestAlgorithm({'algorithm': 'sha1'}) self.assertEqual(core.Null, sha1['parameters'].__class__) + + def test_ccm_parameters(self): + with open(os.path.join(fixtures_dir, 'aesccm_algo.der'), 'rb') as f: + # PBES2 AlgorithmIdentifier + algo = algos.EncryptionAlgorithm().load(f.read()) + scheme = algo['parameters']['encryption_scheme'] + self.assertEqual(scheme['parameters'].__class__, algos.CcmParams) + self.assertEqual(scheme['parameters']['aes_nonce'].__class__, core.OctetString) + self.assertEqual(scheme['parameters']['aes_nonce'].native, b'z\xb7\xbd\xb7\xe1\xc6\xc0\x11\xc1?\xf00') + self.assertEqual(scheme['parameters']['aes_icvlen'].__class__, core.Integer) + self.assertEqual(scheme['parameters']['aes_icvlen'].native, 8) diff --git a/tests/test_cms.py b/tests/test_cms.py index a6746fa..2afd7ca 100644 --- a/tests/test_cms.py +++ b/tests/test_cms.py @@ -7,7 +7,7 @@ import zlib import sys from datetime import datetime -from asn1crypto import cms, util, core +from asn1crypto import cms, util from ._unittest_compat import patch patch() @@ -279,7 +279,7 @@ class CMSTests(unittest.TestCase): recipient['rid'].native ) self.assertEqual( - 'rsa', + 'rsaes_pkcs1v15', recipient['key_encryption_algorithm']['algorithm'].native ) self.assertEqual( diff --git a/tests/test_core.py b/tests/test_core.py index 94fd8aa..aaff9f5 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -3,7 +3,7 @@ from __future__ import unicode_literals, division, absolute_import, print_functi import unittest import os -from datetime import datetime +from datetime import datetime, timedelta from asn1crypto import core, util @@ -56,6 +56,30 @@ class CopySeq(core.Sequence): ] +class NestSeqAny(core.Sequence): + _fields = [ + ('id', core.ObjectIdentifier), + ('value', core.Any), + ] + + _oid_pair = ('id', 'value') + _oid_specs = { + '3.4.5': Seq, + } + + +class NestSeqExplicit(core.Sequence): + _fields = [ + ('id', core.ObjectIdentifier), + ('value', NamedBits), + ] + + _oid_pair = ('id', 'value') + _oid_specs = { + '3.4.5': Seq, + } + + class Enum(core.Enumerated): _map = { 0: 'a', @@ -100,6 +124,19 @@ class SeqChoiceOldApi(core.Choice): ] +class ChoiceChoice(core.Choice): + _alternatives = [ + ('num', NumChoice, {'explicit': 0}), + ('seq', SeqChoice, {'explicit': 1}), + ] + + +class CCSeq(core.Sequence): + _fields = [ + ('cc', ChoiceChoice) + ] + + class ExplicitField(core.Sequence): _fields = [ ('field', NumChoice, {'tag_type': 'explicit', 'tag': 0}), @@ -144,6 +181,7 @@ class MyOids(core.ObjectIdentifier): '4.5.6': 'def', } + class ApplicationTaggedInteger(core.Integer): # This class attribute may be a 2-element tuple of integers, # or a tuple of 2-element tuple of integers. The first form @@ -190,9 +228,102 @@ class ApplicationTaggedOuter(core.Sequence): ] +class SpcPeImageFlags(core.BitString): + _map = { + 0: "includeResources", + 1: "includeDebugInfo", + 2: "includeImportAddressTable", + } + + +class SpcSerializedObject(core.Sequence): + _fields = [ + ("classId", core.OctetString), + ("serializedData", core.OctetString), + ] + + +class SpcString(core.Choice): + _alternatives = [ + ("unicode", core.BMPString, {"implicit": 0}), + ("ascii", core.IA5String, {"implicit": 1}), + ] + + +class SpcLink(core.Choice): + _alternatives = [ + ("url", core.IA5String, {"implicit": 0}), + ("moniker", SpcSerializedObject, {"implicit": 1}), + ("file", SpcString, {"explicit": 2}) + ] + + +class SpcPeImageData(core.Sequence): + _fields = [ + ("flags", SpcPeImageFlags, {"default": "includeResources"}), + ("file", SpcLink, {"explicit": 0}) + ] + + +class UTF8Sequence(core.Sequence): + _fields = [ + ("string", core.UTF8String) + ] + + +class NestedUTF8Sequence(core.Sequence): + _fields = [ + ("seq", UTF8Sequence) + ] + + @data_decorator class CoreTests(unittest.TestCase): + def test_large_tag_encode(self): + # https://misc.daniel-marschall.de/asn.1/oid_facts.html + v = core.Primitive(tag=31, contents=b'') + self.assertEqual(b'\x1f\x1f\x00', v.dump()) + + v = core.Primitive(tag=36, contents=b'') + self.assertEqual(b'\x1f\x24\x00', v.dump()) + + # One extra byte + v = core.Primitive( + class_="application", + method="constructed", + tag=73, + contents=b'' + ) + self.assertEqual(b'\x7f\x49\x00', v.dump()) + + # Two extra bytes + v = core.Primitive( + class_="application", + method="constructed", + tag=201, + contents=b'' + ) + self.assertEqual(b'\x7f\x81\x49\x00', v.dump()) + + # Three extra bytes + v = core.Primitive( + class_="application", + method="constructed", + tag=16384, + contents=b'' + ) + self.assertEqual(b'\x7f\x81\x80\x00\x00', v.dump()) + + def test_manual_construction(self): + v = core.Asn1Value( + class_="application", + method="constructed", + tag=1, + contents=b'' + ) + self.assertEqual(b'\x61\x00', v.dump()) + def test_sequence_spec(self): seq = Seq() seq['id'] = '1.2.3' @@ -255,6 +386,8 @@ class CoreTests(unittest.TestCase): (datetime(2030, 12, 31, 8, 30, 0, tzinfo=util.timezone.utc), b'\x17\x0D301231083000Z'), (datetime(2049, 12, 31, 8, 30, 0, tzinfo=util.timezone.utc), b'\x17\x0D491231083000Z'), (datetime(1950, 12, 31, 8, 30, 0, tzinfo=util.timezone.utc), b'\x17\x0D501231083000Z'), + (datetime(2018, 10, 20, 7, 35, 4, tzinfo=util.timezone(timedelta(hours=7, minutes=40))), + b'\x17\x0D181019235504Z'), ) @data('utctime_info') @@ -263,6 +396,66 @@ class CoreTests(unittest.TestCase): self.assertEqual(der_bytes, u.dump()) self.assertEqual(native, core.UTCTime.load(der_bytes).native) + def test_utctime_errors(self): + with self.assertRaises(ValueError): + # is not aware + core.UTCTime(datetime.fromtimestamp(1234567890)) + + with self.assertRaises(ValueError): + # Is pre 1950 + core.UTCTime(datetime(1910, 6, 22, 11, 33, 44, tzinfo=util.timezone.utc)) + + with self.assertRaises(ValueError): + # Is past 2050 + core.UTCTime(datetime(2106, 2, 7, 6, 28, 16, tzinfo=util.timezone.utc)) + + @staticmethod + def generalized_time_info(): + def tz(hours, minutes=0): + return util.create_timezone(timedelta(hours=hours, minutes=minutes)) + + return ( + (b'\x18\x1520180405062426.0+0200', datetime(2018, 4, 5, 6, 24, 26, 0, tz(2)), b'\x18\x0f20180405042426Z'), + (b'\x18\x0f2018062419-1355', datetime(2018, 6, 24, 19, 0, 0, 0, tz(-13, -55)), b'\x18\x0f20180625085500Z'), + (b'\x18\x0d2018062419-13', datetime(2018, 6, 24, 19, 0, 0, 0, tz(-13)), b'\x18\x0f20180625080000Z'), + (b'\x18\x0b2018062419Z', datetime(2018, 6, 24, 19, 0, 0, 0, tz(0)), b'\x18\x0f20180624190000Z'), + (b'\x18\x122018062419.15+0345', datetime(2018, 6, 24, 19, 9, 0, 0, tz(3, 45)), b'\x18\x0f20180624152400Z'), + ( + b'\x18\x13201806241957,433+02', + datetime(2018, 6, 24, 19, 57, 25, 980000, tz(2)), + b'\x18\x1220180624175725.98Z', + ), + ( + b'\x18\x1620180624195724.215999Z', + datetime(2018, 6, 24, 19, 57, 24, 215999, tz(0)), + b'\x18\x1620180624195724.215999Z', + ), + ( + b'\x18\x150000022910.31337-0815', + util.extended_datetime(0, 2, 29, 10, 18, 48, 132000, tz(-8, -15)), + b'\x18\x1300000229183348.132Z', + ), + (b'\x18\x1520180624195724.215999', datetime(2018, 6, 24, 19, 57, 24, 215999), None), + (b'\x18\x0a2018062419', datetime(2018, 6, 24, 19, 0, 0, 0), None), + ) + + @data('generalized_time_info') + def generalized_time(self, ber_bytes, native, der_bytes): + decoded = core.GeneralizedTime.load(ber_bytes) + + self.assertEqual(decoded.native, native) + self.assertEqual(decoded.native.tzinfo, native.tzinfo) + + if der_bytes is not None: + encoded = core.GeneralizedTime(native).dump() + self.assertEqual(encoded, der_bytes) + + decoded2 = core.GeneralizedTime.load(encoded) + self.assertEqual(decoded2.native, native) + else: + with self.assertRaises(ValueError): + encoded = core.GeneralizedTime(native).dump() + @staticmethod def type_info(): return ( @@ -278,6 +471,21 @@ class CoreTests(unittest.TestCase): self.assertEqual(native, parsed.native) self.assertEqual(der, parsed.dump(force=True)) + def test_int_to_bit_tuple(self): + self.assertEqual((), core._int_to_bit_tuple(0, 0)) + self.assertEqual((0,), core._int_to_bit_tuple(0, 1)) + self.assertEqual((1,), core._int_to_bit_tuple(1, 1)) + self.assertEqual((0, 0), core._int_to_bit_tuple(0, 2)) + self.assertEqual((0, 1), core._int_to_bit_tuple(1, 2)) + self.assertEqual((0, 0, 1), core._int_to_bit_tuple(1, 3)) + self.assertEqual((0, 1, 0), core._int_to_bit_tuple(2, 3)) + self.assertEqual((1, 0, 1), core._int_to_bit_tuple(5, 3)) + + with self.assertRaises(ValueError): + core._int_to_bit_tuple(9, 3) + with self.assertRaises(ValueError): + core._int_to_bit_tuple(-9, 5) + @staticmethod def bit_string_info(): return ( @@ -293,6 +501,27 @@ class CoreTests(unittest.TestCase): self.assertEqual(der_bytes, bs.dump()) self.assertEqual(native, core.BitString.load(der_bytes).native) + def test_bit_string_load_dump(self): + bs = core.BitString.load(b'\x03\x01\x00') + self.assertEqual(tuple(), bs.native) + self.assertEqual(b'\x03\x01\x00', bs.dump(True)) + + @staticmethod + def bit_string_error_values(): + return ( + # unused bits in empty bit string + (b'\x03\x01\x05',), + # too many unused bits + (b'\x03\x03\x0e\x0c\x00',), + # chunk with unused bits is not last chunk + (b'\x23\x80\x03\x02\x01\xfe\x03\x02\x00\x55\x00\x00',), + ) + + @data('bit_string_error_values') + def bit_string_errors(self, enc_bytes): + with self.assertRaises(ValueError): + core.BitString.load(enc_bytes).native + def test_cast(self): a = core.OctetBitString(b'\x00\x01\x02\x03') self.assertEqual(b'\x00\x01\x02\x03', a.native) @@ -351,6 +580,19 @@ class CoreTests(unittest.TestCase): with self.assertRaises(ValueError): NumChoiceOldApi.load(b'\xA0\x03\x02\x01\x00\x00', strict=True) + def test_choice_parse_return(self): + nc = NumChoice.load(b'\xA0\x03\x02\x01\x00\x00') + nc._parsed = None + self.assertEqual(0, nc.parse().native) + + def test_sequece_choice_choice(self): + CCSeq({ + 'cc': ChoiceChoice( + 'num', + NumChoice('one', core.Integer(0)) + ) + }) + def test_bit_string_item_access(self): named = core.BitString() named[0] = True @@ -594,6 +836,104 @@ class CoreTests(unittest.TestCase): choice2.chosen['name'] = 'bar' self.assertNotEqual(choice2.chosen['name'], choice2_copy.chosen['name']) + def test_dump_ber_indefinite(self): + # A simple primitive type that is indefinite-length-encoded will be + # automatically re-encoded to DER encoding + data = b'\x2C\x80\x0C\x03foo\x00\x00' + v = core.UTF8String.load(data) + self.assertEqual(True, v._indefinite) + self.assertEqual('foo', v.native) + self.assertEqual(b'\x0C\x03foo', v.dump()) + + # In this case the indefinite length items are nested, and the + # top-level item is fixed-length, so it won't get automatically + # re-encoded + data = b'\x30\x0d\x30\x80\x2C\x80\x0C\x03foo\x00\x00\x00\x00' + v = NestedUTF8Sequence.load(data) + self.assertEqual(data, v.dump()) + + # Here both the top-level and the nested encoding will get fixed since + # the top-level being indefinitely triggers a full re-encoding + data = b'\x30\x80\x30\x09\x2C\x80\x0C\x03foo\x00\x00\x00\x00' + v = NestedUTF8Sequence.load(data) + self.assertEqual(b'\x30\x07\x30\x05\x0C\x03foo', v.dump()) + + def test_copy_indefinite(self): + v = core.BitString.load(b'\x23\x80\x03\x02\x00\x04\x00\x00') + self.assertEqual(True, v._indefinite) + v2 = v.copy() + self.assertEqual(0, v2.method) + self.assertEqual(3, v2.tag) + self.assertEqual(False, v2._indefinite) + self.assertEqual((0, 0, 0, 0, 0, 1, 0, 0), v2.native) + self.assertEqual(b'\x03\x02\x00\x04', v2.dump()) + + v = core.OctetBitString.load(b'\x23\x80\x03\x02\x00\x04\x00\x00') + self.assertEqual(True, v._indefinite) + v2 = v.copy() + self.assertEqual(0, v2.method) + self.assertEqual(3, v2.tag) + self.assertEqual(False, v2._indefinite) + self.assertEqual(b'\x04', v2.native) + self.assertEqual(b'\x03\x02\x00\x04', v2.dump()) + + v = core.ParsableOctetBitString.load(b'\x23\x80\x03\x04\x00\x02\x01\x04\x00\x00') + self.assertEqual(4, v.parsed.native) + self.assertEqual(True, v._indefinite) + v2 = v.copy() + self.assertEqual(0, v2.method) + self.assertEqual(3, v2.tag) + self.assertEqual(False, v2._indefinite) + self.assertEqual(4, v2.parsed.native) + self.assertEqual(b'\x03\x04\x00\x02\x01\x04', v2.dump()) + + v = core.IntegerBitString.load(b'\x23\x80\x03\x02\x00\x04\x00\x00') + self.assertEqual(True, v._indefinite) + v2 = v.copy() + self.assertEqual(0, v2.method) + self.assertEqual(3, v2.tag) + self.assertEqual(False, v2._indefinite) + self.assertEqual(4, v2.native) + self.assertEqual(b'\x03\x02\x00\x04', v2.dump()) + + v = core.OctetString.load(b'\x24\x80\x04\x03foo\x00\x00') + self.assertEqual(True, v._indefinite) + v2 = v.copy() + self.assertEqual(0, v2.method) + self.assertEqual(4, v2.tag) + self.assertEqual(False, v2._indefinite) + self.assertEqual(b'foo', v2.native) + self.assertEqual(b'\x04\x03foo', v2.dump()) + + v = core.IntegerOctetString.load(b'\x24\x80\x04\x01\x04\x00\x00') + self.assertEqual(True, v._indefinite) + v2 = v.copy() + self.assertEqual(0, v2.method) + self.assertEqual(4, v2.tag) + self.assertEqual(False, v2._indefinite) + self.assertEqual(4, v2.native) + self.assertEqual(b'\x04\x01\x04', v2.dump()) + + v = core.ParsableOctetString.load(b'\x24\x80\x04\x03\x02\x01\x04\x00\x00') + self.assertEqual(4, v.parsed.native) + self.assertEqual(True, v._indefinite) + v2 = v.copy() + self.assertEqual(0, v2.method) + self.assertEqual(4, v2.tag) + self.assertEqual(False, v2._indefinite) + self.assertEqual(4, v2.parsed.native) + self.assertEqual(b'\x02\x01\x04', v2.__bytes__()) + self.assertEqual(b'\x04\x03\x02\x01\x04', v2.dump()) + + v = core.UTF8String.load(b'\x2C\x80\x0C\x03foo\x00\x00') + self.assertEqual(True, v._indefinite) + v2 = v.copy() + self.assertEqual(0, v2.method) + self.assertEqual(12, v2.tag) + self.assertEqual(False, v2._indefinite) + self.assertEqual('foo', v2.native) + self.assertEqual(b'\x0C\x03foo', v2.dump()) + def test_concat(self): child1 = Seq({ 'id': '1.2.3', @@ -619,6 +959,12 @@ class CoreTests(unittest.TestCase): with self.assertRaises(ValueError): MyOids.unmap('no_such_mapping') + def test_oid_dotted_native(self): + self.assertEqual('abc', MyOids('1.2.3').native) + self.assertEqual('1.2.3', MyOids('1.2.3').dotted) + self.assertEqual('abc', MyOids('abc').native) + self.assertEqual('1.2.3', MyOids('abc').dotted) + def test_dump_set(self): st = SetTest({'two': 2, 'one': 1}) self.assertEqual(b'1\x06\x81\x01\x01\x82\x01\x02', st.dump()) @@ -637,7 +983,8 @@ class CoreTests(unittest.TestCase): self.assertEqual(a._bytes, a.copy()._bytes) def test_indefinite_length_octet_string_2(self): - data = b'$\x80\x04\r\x8d\xff\xf0\x98\x076\xaf\x93nB:\xcf\xcc\x04\x15\x92w\xf7\xf0\xe4y\xff\xc7\xdc3\xb2\xd0={\x1a\x18mDr\xaaI\x00\x00' + data = b'$\x80\x04\r\x8d\xff\xf0\x98\x076\xaf\x93nB:\xcf\xcc\x04\x15' \ + b'\x92w\xf7\xf0\xe4y\xff\xc7\xdc3\xb2\xd0={\x1a\x18mDr\xaaI\x00\x00' a = core.OctetString.load(data) self.assertEqual( b'\x8d\xff\xf0\x98\x076\xaf\x93nB:\xcf\xcc\x92w\xf7\xf0\xe4y\xff\xc7\xdc3\xb2\xd0={\x1a\x18mDr\xaaI', @@ -686,25 +1033,109 @@ class CoreTests(unittest.TestCase): self.assertEqual(a._unicode, a.copy()._unicode) def test_indefinite_length_bit_string(self): - data = b'#\x80\x00\x03\x02\x00\x01\x03\x02\x02\x04\x00\x00' + data = b'#\x80\x03\x02\x00\x01\x03\x02\x02\x04\x00\x00' a = core.BitString.load(data) self.assertEqual((0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1), a.native) + self.assertEqual((0, 0), a.unused_bits) + + # Example from X.690 §8.6.4.2 + prim = core.BitString.load(b'\x03\x07\x04\x0A\x3B\x5F\x29\x1C\xD0') + self.assertEqual((0, 0, 0, 0), prim.unused_bits) + indef = core.BitString.load(b'\x23\x80\x03\x03\x00\x0a\x3b\x03\x05\x04\x5f\x29\x1c\xd0\x00\x00') + self.assertEqual(prim.native, indef.native) + self.assertEqual(core._int_to_bit_tuple(0x0A3B5F291CD, 44), indef.native) + self.assertEqual((0, 0, 0, 0), indef.unused_bits) + + unused = core.BitString.load(b'\x23\x80\x03\x03\x00\x0a\x3b\x03\x05\x04\x5f\x29\x1c\xdd\x00\x00') + self.assertEqual(indef.native, unused.native) + self.assertEqual((1, 1, 0, 1), unused.unused_bits) + + unused.set(indef.native) + self.assertEqual(indef.native, unused.native) + self.assertEqual((0, 0, 0, 0), unused.unused_bits) + + def test_integer_bit_string(self): + a = core.IntegerBitString.load(b'\x03\x02\x04\xcb') + self.assertEqual(12, a.native) + self.assertEqual((1, 0, 1, 1), a.unused_bits) + + b = a.copy() + self.assertEqual(12, b.native) + self.assertEqual((1, 0, 1, 1), b.unused_bits) + + a.set(56) + self.assertEqual((), a.unused_bits) + self.assertEqual(56, a.native) + self.assertEqual(b'\x03\x02\x00\x38', a.dump()) + + with self.assertRaises(TypeError): + a.set('badtype') + + with self.assertRaises(ValueError): + core.IntegerBitString(-1) def test_indefinite_length_integer_bit_string(self): - data = b'#\x80\x00\x03\x02\x00\x01\x03\x02\x00\x04\x00\x00' + data = b'#\x80\x03\x02\x00\x01\x03\x02\x00\x04\x00\x00' a = core.IntegerBitString.load(data) self.assertEqual(260, a.native) + self.assertEqual((), a.unused_bits) + + a = core.IntegerBitString.load(b'\x23\x80\x00\x00') + self.assertEqual(0, a.native) + self.assertEqual((), a.unused_bits) + + a = core.IntegerBitString.load(b'\x23\x80\x03\x01\x00\x03\x03\x03\x03\x03\x00\x00') + self.assertEqual(96, a.native) + self.assertEqual((0, 1, 1), a.unused_bits) + + a.set(56) + self.assertEqual((), a.unused_bits) + self.assertEqual(56, a.native) + self.assertEqual(b'\x03\x02\x00\x38', a.dump()) + + @data('bit_string_error_values') + def integer_bit_string_errors(self, enc_bytes): + with self.assertRaises(ValueError): + core.IntegerBitString.load(enc_bytes).native + + def test_octet_bit_string(self): + a = core.OctetBitString.load(b'\x03\x02\x04\xcb') + self.assertEqual(b'\xc0', a.native) + self.assertEqual((1, 0, 1, 1), a.unused_bits) + + a.set(b'\x38') + self.assertEqual((), a.unused_bits) + self.assertEqual(b'\x38', a.native) + self.assertEqual(b'\x03\x02\x00\x38', a.dump()) + + with self.assertRaises(TypeError): + a.set('badtype') def test_indefinite_length_octet_bit_string(self): - data = b'#\x80\x00\x03\x02\x00\x01\x03\x02\x00\x04\x00\x00' + data = b'#\x80\x03\x02\x00\x01\x03\x02\x00\x04\x00\x00' a = core.OctetBitString.load(data) self.assertEqual(b'\x01\x04', a.native) self.assertEqual(b'\x01\x04', a.__bytes__()) # Test copying moves internal state self.assertEqual(a._bytes, a.copy()._bytes) + # octet bit string with unused bits + a = core.OctetBitString.load(b'\x23\x80\x03\x05\x05\x74\x65\x73\x74\x00\x00') + self.assertEqual(b'\x74\x65\x73\x60', a.native) + self.assertEqual((1, 0, 1, 0, 0), a.unused_bits) + + a.set(b'\x38') + self.assertEqual((), a.unused_bits) + self.assertEqual(b'\x38', a.native) + self.assertEqual(b'\x03\x02\x00\x38', a.dump()) + + @data('bit_string_error_values') + def octet_bit_string_errors(self, enc_bytes): + with self.assertRaises(ValueError): + core.OctetBitString.load(enc_bytes).native + def test_indefinite_length_parsable_octet_bit_string(self): - data = b'#\x80\x00\x03\x03\x00\x0C\x02\x03\x03\x00\x61\x62\x00\x00' + data = b'#\x80\x03\x03\x00\x0C\x02\x03\x03\x00\x61\x62\x00\x00' a = core.ParsableOctetBitString.load(data) self.assertEqual(b'\x0C\x02\x61\x62', a.parsed.dump()) self.assertEqual(b'\x0C\x02\x61\x62', a.__bytes__()) @@ -714,6 +1145,20 @@ class CoreTests(unittest.TestCase): self.assertEqual(a._bytes, a.copy()._bytes) self.assertEqual(a._parsed, a.copy()._parsed) + with self.assertRaises(ValueError): + # parsable octet bit string with unused bits + core.ParsableOctetBitString.load(b'\x23\x80\x03\x03\x04\x02\x00\x03\x03\x04\x12\xa0\x00\x00').native + + def test_integer_octet_string(self): + v = core.IntegerOctetString(10) + self.assertEqual(10, v.native) + + with self.assertRaises(TypeError): + core.IntegerOctetString('0') + + with self.assertRaises(ValueError): + core.IntegerOctetString(-1) + def test_explicit_application_tag(self): data = b'\x6a\x81\x03\x02\x01\x00' ati = ApplicationTaggedInteger.load(data) @@ -775,3 +1220,108 @@ class CoreTests(unittest.TestCase): self.assertEqual(42, inum.native) self.assertEqual(der, ato.dump(force=True)) + + def test_sequence_choice_field_by_tuple(self): + val = ExplicitField({'field': ('one', 32)}) + self.assertEqual('one', val['field'].name) + self.assertEqual(32, val['field'].chosen.native) + + def test_sequence_choice_field_by_dict(self): + val = ExplicitField({'field': {'two': 32}}) + self.assertEqual('two', val['field'].name) + self.assertEqual(32, val['field'].chosen.native) + + def test_nested_explicit_tag_choice(self): + # Explicitly tagged values have a _header that contains + # the explicit tag and the header for the contained value. + # When parsing nested Choice values, it is necessary to not pull + # up the next Choice value's header, since Choice values + # themselves don't have their own header and it will result in + # duplication. + data = b'\x30\x09\x03\x01\x00\xa0\x04\xa2\x02\x80\x00' + image_data = SpcPeImageData.load(data, strict=True) + self.assertEqual(data[2:5], image_data['flags'].dump()) + self.assertEqual(data[5:11], image_data['file'].dump()) + self.assertEqual(data[5:7], image_data['file']._header) + self.assertEqual(data[7:11], image_data['file'].chosen.dump()) + self.assertEqual(data[7:9], image_data['file'].chosen._header) + self.assertEqual(data[9:11], image_data['file'].chosen.chosen.dump()) + self.assertEqual(data[9:11], image_data['file'].chosen.chosen._header) + + image_data2 = SpcPeImageData.load(data, strict=True) + self.assertEqual(data[2:5], image_data2['flags'].dump(True)) + self.assertEqual(data[5:11], image_data2['file'].dump(True)) + self.assertEqual(data[5:7], image_data2['file']._header) + self.assertEqual(data[7:11], image_data2['file'].chosen.dump(True)) + self.assertEqual(data[7:9], image_data2['file'].chosen._header) + self.assertEqual(data[9:11], image_data2['file'].chosen.chosen.dump(True)) + self.assertEqual(data[9:11], image_data2['file'].chosen.chosen._header) + + def test_choice_dump_header_native(self): + s = SpcString({'unicode': 'test'}) + self.assertEqual(b'\x80\x08\x00t\x00e\x00s\x00t', s.dump()) + self.assertEqual(b'', s._header) + self.assertEqual('test', s.native) + self.assertEqual(b'\x80\x08', s.chosen._header) + self.assertEqual('test', s.chosen.native) + + link = SpcLink('file', {'unicode': 'test'}) + self.assertEqual(b'\xa2\x0a\x80\x08\x00t\x00e\x00s\x00t', link.dump()) + self.assertEqual(b'', link._header) + self.assertEqual('test', link.native) + self.assertEqual(b'\xa2\x0a', link.chosen._header) + self.assertEqual('test', link.chosen.native) + self.assertEqual(b'\x80\x08', link.chosen.chosen._header) + self.assertEqual('test', link.chosen.chosen.native) + + def test_parse_broken_sequence_fields_repeatedly(self): + s = Seq.load(b'\x30\x06\x88\x00\x00\x00\x00\x00') + with self.assertRaises(ValueError): + s.native + with self.assertRaises(ValueError): + s.native + + def test_parse_broken_sequenceof_children_repeatedly(self): + s = SequenceOfInts.load(b'\x30\x06\x88\x00\x00\x00\x00\x00') + with self.assertRaises(ValueError): + s.native + with self.assertRaises(ValueError): + s.native + + def test_wrong_asn1value(self): + with self.assertRaises(TypeError): + Seq({ + 'id': core.Integer(1), + 'value': 1 + }) + + def test_wrong_asn1value2(self): + with self.assertRaises(TypeError): + CopySeq({ + 'name': core.UTF8String('Test'), + 'pair': core.Integer(1) + }) + + def test_wrong_asn1value3(self): + with self.assertRaises(TypeError): + NestSeqAny({ + 'id': '3.4.5', + 'value': core.Integer(1) + }) + + def test_wrong_asn1value4(self): + with self.assertRaises(TypeError): + NestSeqExplicit({ + 'id': '3.4.5', + 'value': core.Integer(1) + }) + + def test_integer_octet_string_encoded_width(self): + a = core.IntegerOctetString(1) + self.assertEqual(1, a.native) + self.assertEqual(b'\x04\x01\x01', a.dump()) + + b = core.IntegerOctetString(1) + b.set_encoded_width(4) + self.assertEqual(1, b.native) + self.assertEqual(b'\x04\x04\x00\x00\x00\x01', b.dump()) diff --git a/tests/test_keys.py b/tests/test_keys.py index 98f9d30..2f2856e 100644 --- a/tests/test_keys.py +++ b/tests/test_keys.py @@ -299,6 +299,94 @@ class KeysTests(unittest.TestCase): key_info['attributes'].native ) + def test_parse_rsapss_private_key(self): + with open(os.path.join(fixtures_dir, 'keys/test-rsapss-der.key'), 'rb') as f: + key_info = keys.PrivateKeyInfo.load(f.read()) + + key = key_info['private_key'].parsed + + self.assertEqual( + 0, + key_info['version'].native + ) + self.assertEqual( + 'rsassa_pss', + key_info['private_key_algorithm']['algorithm'].native + ) + self.assertEqual( + None, + key_info['private_key_algorithm']['parameters'].native + ) + + self.assertEqual( + 'two-prime', + key['version'].native + ) + self.assertEqual( + 20334810015710919160110203472269180092101382951468058535601491502957196266577250503666807938732810152931665713052098820680792829137564325868564844098687045650387144565108903086036194735310494097581552241575174798917880615962200904076841064384200149608953782976948109759991080721261141139715447415148530436086884795768009560076896590825433136132086023781159444716805738553676228393667377624295683128237093827752550284339271476658714835879903906034493875531632793284572104031230158276531850092876884395075794398068537347947800593962574809516836581297669594643468201529164877789603529698620577572178907861813134904392181, # noqa + key['modulus'].native + ) + self.assertEqual( + 65537, + key['public_exponent'].native + ) + self.assertEqual( + 19811367921985171557639752989981035886303512541789150212828710994763522615025976847568941008714007785902419332260807020468874408966438534060269241736746690644631569655037665166904359886012100769497873119376457740069070560586943676477505866318738720913860857882999478282122015106772111353446622784949473859714808146533832277397219231218258638918521475883551912394494264506377559745603922894963456171825545032908365582944199734667178542763963194351614183530759037228600105514522819433425764227915014375970397879315537366008672232442295229043876987446583754589361036423305704619726617664187630589314612553217357586095673, # noqa + key['private_exponent'].native + ) + self.assertEqual( + 145062186227663059634108314593892541355080853648164075820395373006330022883408993468365984286369578851636705799765757665015182142763055043654284213839887910732213256250809510746337738407165996181392718941356683486810092456676083857188565619344293262177288309348259896401807590237461717880393098789423620650939, # noqa + key['prime1'].native + ) + self.assertEqual( + 140179950023620372289001596962713930540779028054089057618536399863850868080064249195053602322991362108187576825895413419966213531630187432159266399149913629896819277637422106295703267471029328291865017941552279870382011332512626586060449095917164740367589115287472025339179557750935025294415109144213020312079, # noqa + key['prime2'].native + ) + self.assertEqual( + 118228658851708114001194157738654137417646348120344781510758784408198602961600439097293142570946864897406396441532083859790972106955549111215800799518497533665722246507785513633594518505277393228754912332478232018012333162654627815552589285314495327920681107702945726939074883271186966123919571825659906212509, # noqa + key['exponent1'].native + ) + self.assertEqual( + 56878789554421364113540907677075374840783006759759162308194149033058002105452927576710337564627405910873614034121348759689054278241450542380322750296695046251983127560528078041645807537568272852545501885984378691627606471980343411760066258123338644976958508227786686876412756148631524064712858116223089798721, # noqa + key['exponent2'].native + ) + self.assertEqual( + 23480707628058872067473220975854826046220552607063059593257976510053338333806071359463231176605785818753563067398907246278690942690250152695883594601176151883590956534074071491193074275985805378044282321604348476199853682247297755042167691612551582210509658456585074900583647465600111554502893125233815233234, # noqa + key['coefficient'].native + ) + self.assertEqual( + None, + key['other_prime_infos'].native + ) + + self.assertEqual( + None, + key_info['attributes'].native + ) + + def test_parse_rsapss_public_key_info(self): + with open(os.path.join(fixtures_dir, 'keys/test-public-rsapss-der.key'), 'rb') as f: + key = keys.PublicKeyInfo.load(f.read()) + + public_key = key['public_key'].parsed + + self.assertEqual( + 'rsassa_pss', + key['algorithm']['algorithm'].native + ) + self.assertEqual( + None, + key['algorithm']['parameters'].native + ) + self.assertEqual( + 20334810015710919160110203472269180092101382951468058535601491502957196266577250503666807938732810152931665713052098820680792829137564325868564844098687045650387144565108903086036194735310494097581552241575174798917880615962200904076841064384200149608953782976948109759991080721261141139715447415148530436086884795768009560076896590825433136132086023781159444716805738553676228393667377624295683128237093827752550284339271476658714835879903906034493875531632793284572104031230158276531850092876884395075794398068537347947800593962574809516836581297669594643468201529164877789603529698620577572178907861813134904392181, # noqa + public_key['modulus'].native + ) + self.assertEqual( + 65537, + public_key['public_exponent'].native + ) + @staticmethod def key_sha1_hashes(): return ( @@ -377,42 +465,6 @@ class KeysTests(unittest.TestCase): ) @data('key_pairs', True) - def compare_fingerprints(self, private_key_file, public_key_file, *_): - with open(os.path.join(fixtures_dir, private_key_file), 'rb') as f: - private_key = keys.PrivateKeyInfo.load(f.read()) - with open(os.path.join(fixtures_dir, public_key_file), 'rb') as f: - public_key = keys.PublicKeyInfo.load(f.read()) - - self.assertEqual(private_key.fingerprint, public_key.fingerprint) - - @data('key_pairs', True) - def compute_public_key(self, private_key_file, public_key_file, *_): - with open(os.path.join(fixtures_dir, private_key_file), 'rb') as f: - private_key = keys.PrivateKeyInfo.load(f.read()) - with open(os.path.join(fixtures_dir, public_key_file), 'rb') as f: - public_key = keys.PublicKeyInfo.load(f.read()) - - self.assertEqual(public_key['public_key'].native, private_key._compute_public_key().native) - - @data('key_pairs', True) - def public_key_property(self, private_key_file, public_key_file, *_): - with open(os.path.join(fixtures_dir, private_key_file), 'rb') as f: - private_key = keys.PrivateKeyInfo.load(f.read()) - with open(os.path.join(fixtures_dir, public_key_file), 'rb') as f: - public_key = keys.PublicKeyInfo.load(f.read()) - - self.assertEqual(public_key['public_key'].native, private_key.public_key.native) - - @data('key_pairs', True) - def public_key_info_property(self, private_key_file, public_key_file, *_): - with open(os.path.join(fixtures_dir, private_key_file), 'rb') as f: - private_key = keys.PrivateKeyInfo.load(f.read()) - with open(os.path.join(fixtures_dir, public_key_file), 'rb') as f: - public_key = keys.PublicKeyInfo.load(f.read()) - - self.assertEqual(public_key.dump(), private_key.public_key_info.dump()) - - @data('key_pairs', True) def algorithm_name(self, private_key_file, public_key_file, algorithm, _): with open(os.path.join(fixtures_dir, private_key_file), 'rb') as f: private_key = keys.PrivateKeyInfo.load(f.read()) @@ -457,15 +509,6 @@ class KeysTests(unittest.TestCase): ), ) - @data('key_variations', True) - def unwrap(self, wrapped_private_key_file, unwrapped_private_key_file): - with open(os.path.join(fixtures_dir, wrapped_private_key_file), 'rb') as f: - private_key = keys.PrivateKeyInfo.load(f.read()) - with open(os.path.join(fixtures_dir, unwrapped_private_key_file), 'rb') as f: - unwrapped_bytes = f.read() - - self.assertEqual(unwrapped_bytes, private_key.unwrap().dump()) - def test_curve_invalid(self): with open(os.path.join(fixtures_dir, 'keys/test-pkcs8-der.key'), 'rb') as f: private_key = keys.PrivateKeyInfo.load(f.read()) @@ -548,3 +591,79 @@ class KeysTests(unittest.TestCase): public_key = keys.PublicKeyInfo.load(f.read()) self.assertEqual(curve, public_key.curve) + + def test_named_curve_register(self): + keys.NamedCurve.register('customcurve', '1.2.3.4.5.6.7.8', 16) + + k = keys.NamedCurve('customcurve') + self.assertEqual('customcurve', k.native) + self.assertEqual('1.2.3.4.5.6.7.8', k.dotted) + + k = keys.ECPrivateKey({ + 'version': 1, + 'private_key': 1, + 'parameters': keys.ECDomainParameters(('named', 'customcurve')), + }) + + self.assertEqual('ecPrivkeyVer1', k['version'].native) + self.assertEqual(1, k['private_key'].native) + self.assertEqual('customcurve', k['parameters'].native) + self.assertEqual( + b'\x04\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01', + k['private_key'].dump() + ) + + def test_ec_private_key_width(self): + k = keys.ECPrivateKey({ + 'version': 1, + 'private_key': 1, + 'parameters': keys.ECDomainParameters(('named', 'secp256r1')), + }) + + self.assertEqual('ecPrivkeyVer1', k['version'].native) + self.assertEqual(1, k['private_key'].native) + self.assertEqual('secp256r1', k['parameters'].native) + self.assertEqual( + b'\x04\x20\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01', + k['private_key'].dump() + ) + + def test_ec_private_key_width_dotted(self): + k = keys.ECPrivateKey({ + 'version': 1, + 'private_key': 1, + 'parameters': keys.ECDomainParameters(('named', '1.3.132.0.10')), + }) + + self.assertEqual('ecPrivkeyVer1', k['version'].native) + self.assertEqual(1, k['private_key'].native) + self.assertEqual('secp256k1', k['parameters'].native) + self.assertEqual( + b'\x04\x20\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01', + k['private_key'].dump() + ) + + def test_ec_private_key_info_width(self): + pki = keys.PrivateKeyInfo({ + 'version': 0, + 'private_key_algorithm': { + 'algorithm': 'ec', + 'parameters': ('named', 'secp256r1'), + }, + 'private_key': { + 'version': 1, + 'private_key': 1 + } + }) + + k = pki['private_key'].parsed + self.assertEqual('ecPrivkeyVer1', k['version'].native) + self.assertEqual(1, k['private_key'].native) + self.assertEqual(None, k['parameters'].native) + self.assertEqual( + b'\x04\x20\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01', + k['private_key'].dump() + ) diff --git a/tests/test_ocsp.py b/tests/test_ocsp.py index c3492c1..3882112 100644 --- a/tests/test_ocsp.py +++ b/tests/test_ocsp.py @@ -153,3 +153,31 @@ class OCSPTests(unittest.TestCase): 'v3', cert['tbs_certificate']['version'].native ) + + def test_cert_status_native(self): + status = ocsp.CertStatus.load(b'\x80\x00') + self.assertEqual('good', status.native) + + status = ocsp.CertStatus(('good', ocsp.StatusGood())) + self.assertEqual('good', status.native) + + with self.assertRaises(ValueError): + ocsp.StatusGood('unknown') + + status = ocsp.CertStatus.load( + b'\xa1\x16\x18\x0f\x32\x30\x31\x38\x31\x30\x30\x33' + b'\x31\x34\x35\x33\x34\x37\x5a\xa0\x03\x0a\x01\x01' + ) + self.assertIsInstance( + status.native, + util.OrderedDict + ) + + status = ocsp.CertStatus.load(b'\x82\x00') + self.assertEqual('unknown', status.native) + + status = ocsp.CertStatus(('unknown', ocsp.StatusUnknown())) + self.assertEqual('unknown', status.native) + + with self.assertRaises(ValueError): + ocsp.StatusUnknown('good') diff --git a/tests/test_parser.py b/tests/test_parser.py index b661c33..4148a84 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -60,3 +60,31 @@ class ParserTests(unittest.TestCase): with self.assertRaises(TypeError): parser.emit(0, 0, 2, '\x00') + + def test_parser_large_tag(self): + # One extra byte + result = parser.parse(b'\x7f\x49\x00') + self.assertEqual(1, result[0]) + self.assertEqual(1, result[1]) + self.assertEqual(73, result[2]) + self.assertEqual(b'\x7f\x49\x00', result[3]) + self.assertEqual(b'', result[4]) + self.assertEqual(b'', result[5]) + + # Two extra bytes + result = parser.parse(b'\x7f\x81\x49\x00') + self.assertEqual(1, result[0]) + self.assertEqual(1, result[1]) + self.assertEqual(201, result[2]) + self.assertEqual(b'\x7f\x81\x49\x00', result[3]) + self.assertEqual(b'', result[4]) + self.assertEqual(b'', result[5]) + + # Three extra bytes + result = parser.parse(b'\x7f\x81\x80\x00\x00') + self.assertEqual(1, result[0]) + self.assertEqual(1, result[1]) + self.assertEqual(16384, result[2]) + self.assertEqual(b'\x7f\x81\x80\x00\x00', result[3]) + self.assertEqual(b'', result[4]) + self.assertEqual(b'', result[5]) diff --git a/tests/test_pem.py b/tests/test_pem.py index 34a1498..8d7f274 100644 --- a/tests/test_pem.py +++ b/tests/test_pem.py @@ -143,6 +143,9 @@ class PEMTests(unittest.TestCase): encoded_bytes = pem.armor(type_name, byte_string, headers=headers) with open(os.path.join(fixtures_dir, expected_bytes_filename), 'rb') as f: expected_bytes = f.read() + # In case a user on Windows has CRLF translation on in Git. + # Ran into this with the GitHub Actions Windows environments. + expected_bytes = expected_bytes.replace(b'\r\n', b'\n') self.assertEqual(expected_bytes, encoded_bytes) def test_armor_wrong_type(self): diff --git a/tests/test_pkcs12.py b/tests/test_pkcs12.py index ffd6398..41f9867 100644 --- a/tests/test_pkcs12.py +++ b/tests/test_pkcs12.py @@ -3,11 +3,9 @@ from __future__ import unicode_literals, division, absolute_import, print_functi import unittest import os -import zlib import sys -from datetime import datetime -from asn1crypto import pkcs12, util, core +from asn1crypto import pkcs12 from ._unittest_compat import patch patch() @@ -104,7 +102,6 @@ class PKCS12Tests(unittest.TestCase): attr_0['values'].native ) - attr_1 = certbag['bag_attributes'][1] self.assertEqual( diff --git a/tests/test_util.py b/tests/test_util.py index a3f3e6e..a5fb5e3 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -4,11 +4,11 @@ from __future__ import unicode_literals, division, absolute_import, print_functi import unittest import sys import os -from datetime import date, datetime, time +from datetime import date, datetime, time, timedelta from asn1crypto import util -from .unittest_data import data_decorator, data +from .unittest_data import data_decorator from ._unittest_compat import patch patch() @@ -31,25 +31,199 @@ utc = util.timezone.utc @data_decorator class UtilTests(unittest.TestCase): + def test_int_to_bytes(self): + self.assertEqual(util.int_to_bytes(0, False, 0), b'') + self.assertEqual(util.int_to_bytes(0, False), b'\x00') + self.assertEqual(util.int_to_bytes(0, False, 3), b'\x00\x00\x00') + self.assertEqual(util.int_to_bytes(0, True, 0), b'') + self.assertEqual(util.int_to_bytes(0, True), b'\x00') + self.assertEqual(util.int_to_bytes(0, True, 3), b'\x00\x00\x00') + + self.assertEqual(util.int_to_bytes(128, False), b'\x80') + self.assertEqual(util.int_to_bytes(128, False, 3), b'\x00\x00\x80') + self.assertEqual(util.int_to_bytes(-128, True), b'\x80') + self.assertEqual(util.int_to_bytes(-128, True, 3), b'\xff\xff\x80') + + self.assertEqual(util.int_to_bytes(255, False), b'\xff') + self.assertEqual(util.int_to_bytes(255, False, 3), b'\x00\x00\xff') + self.assertEqual(util.int_to_bytes(-1, True), b'\xff') + self.assertEqual(util.int_to_bytes(-1, True, 3), b'\xff\xff\xff') + + self.assertEqual(util.int_to_bytes(12345678, False), b'\xbc\x61\x4e') + self.assertEqual(util.int_to_bytes(12345678, False, 3), b'\xbc\x61\x4e') + self.assertEqual(util.int_to_bytes(12345678, False, 5), b'\x00\x00\xbc\x61\x4e') + self.assertEqual(util.int_to_bytes(12345678 - 2 ** 24, True), b'\xbc\x61\x4e') + self.assertEqual(util.int_to_bytes(12345678 - 2 ** 24, True, 3), b'\xbc\x61\x4e') + self.assertEqual(util.int_to_bytes(12345678 - 2 ** 24, True, 5), b'\xff\xff\xbc\x61\x4e') + + with self.assertRaises(OverflowError): + util.int_to_bytes(123456789, width=3) + with self.assertRaises(OverflowError): + util.int_to_bytes(50000, signed=True, width=2) + + def test_int_from_bytes(self): + self.assertEqual(util.int_from_bytes(b'', False), 0) + self.assertEqual(util.int_from_bytes(b'', True), 0) + self.assertEqual(util.int_from_bytes(b'\x00', False), 0) + self.assertEqual(util.int_from_bytes(b'\x00', True), 0) + self.assertEqual(util.int_from_bytes(b'\x80', False), 128) + self.assertEqual(util.int_from_bytes(b'\x80', True), -128) + self.assertEqual(util.int_from_bytes(b'\xff', False), 255) + self.assertEqual(util.int_from_bytes(b'\xff', True), -1) + self.assertEqual(util.int_from_bytes(b'\xbc\x61\x4e', False), 12345678) + self.assertEqual(util.int_from_bytes(b'\xbc\x61\x4e', True), 12345678 - 2 ** 24) + + def test_int_fromto_bytes(self): + for i in range(-300, 301): + self.assertEqual(i, util.int_from_bytes(util.int_to_bytes(i, True), True)) + for i in range(0, 301): + self.assertEqual(i, util.int_from_bytes(util.int_to_bytes(i, False), False)) + + def test_timezone(self): + delta_plus_5_42 = timedelta(hours=5, minutes=42) + delta_minus_5_42 = -delta_plus_5_42 + + # limited to +24h + with self.assertRaises(ValueError): + util.timezone(delta_plus_5_42 * 5) + + # limited to -24h + with self.assertRaises(ValueError): + util.timezone(delta_minus_5_42 * 5) + + # py2 implementation supports no sub-minutes time zones + if py2: + with self.assertRaises(ValueError): + util.timezone(timedelta(hours=5, minutes=42, seconds=13)) + + with self.assertRaises(ValueError): + util.timezone(timedelta(hours=5, minutes=42, microseconds=13)) + + # test __eq__ + tz0 = util.timezone(delta_plus_5_42) + tz1 = util.timezone(delta_minus_5_42) + self.assertEqual(tz0, tz0) + self.assertEqual(tz1, tz1) + self.assertNotEqual(tz0, tz1) + self.assertFalse(tz0 == "not equal to a str") + + # test tzname + self.assertEqual('5_42', util.timezone(delta_plus_5_42, '5_42').tzname(None)) + self.assertEqual('UTC+05:42', util.timezone(delta_plus_5_42).tzname(None)) + self.assertEqual('UTC-05:42', util.timezone(delta_minus_5_42).tzname(None)) + if py2 or sys.version_info >= (3, 6): + # bpo22241 + self.assertEqual('UTC', util.timezone(timedelta(0)).tzname(None)) + + # test utcoffset + self.assertEqual(delta_minus_5_42, util.timezone(delta_minus_5_42).utcoffset(None)) + + # test dst + self.assertTrue(util.timezone(delta_minus_5_42).dst(None) in set((timedelta(0), None))) + + # test create_timezone + self.assertTrue(util.create_timezone(delta_plus_5_42) is util.create_timezone(timedelta(hours=5, minutes=42))) + self.assertFalse(util.create_timezone(delta_plus_5_42) is util.create_timezone(delta_minus_5_42)) + + def test_utc_with_dst(self): + self.assertEqual('UTC', util.utc_with_dst.tzname(None)) + def test_extended_date_strftime(self): self.assertEqual('0000-01-01', util.extended_date(0, 1, 1).strftime('%Y-%m-%d')) self.assertEqual('Sat Saturday Jan January', util.extended_date(0, 1, 1).strftime('%a %A %b %B')) self.assertEqual('Tue Tuesday Feb February 29', util.extended_date(0, 2, 29).strftime('%a %A %b %B %d')) - if sys.platform == 'win32': + if sys.platform == 'win32' and sys.version_info < (3, 5): self.assertEqual('01/01/00 00:00:00', util.extended_date(0, 1, 1).strftime('%c')) else: self.assertEqual('Sat Jan 1 00:00:00 0000', util.extended_date(0, 1, 1).strftime('%c')) self.assertEqual('01/01/00', util.extended_date(0, 1, 1).strftime('%x')) + def test_extended_datetime_init(self): + with self.assertRaises(ValueError): + util.extended_datetime(2000, 11, 27) + + def test_extended_date_init(self): + with self.assertRaises(ValueError): + util.extended_date(2000, 11, 27) + + def test_extended_datetime_properties(self): + zone = util.create_timezone(timedelta(hours=12, minutes=45)) + dt = util.extended_datetime(0, 11, 27, 5, 44, 31, 14889, zone) + self.assertEqual(dt.year, 0) + self.assertEqual(dt.month, 11) + self.assertEqual(dt.day, 27) + self.assertEqual(dt.hour, 5) + self.assertEqual(dt.minute, 44) + self.assertEqual(dt.second, 31) + self.assertEqual(dt.microsecond, 14889) + self.assertEqual(dt.tzinfo, zone) + + def test_extended_date_properties(self): + ext_date = util.extended_date(0, 11, 27) + self.assertEqual(ext_date.year, 0) + self.assertEqual(ext_date.month, 11) + self.assertEqual(ext_date.day, 27) + + def test_extended_datetime_isoformat(self): + self.assertEqual('0000-01-01T00:00:00', util.extended_datetime(0, 1, 1).isoformat()) + self.assertEqual('0000-01-01T00:00:00.001000', util.extended_datetime(0, 1, 1, microsecond=1000).isoformat()) + self.assertEqual('0000-01-01%00:00:00', util.extended_datetime(0, 1, 1).isoformat(sep='%')) + + def test_extended_date_isoformat(self): + self.assertEqual('0000-01-01', util.extended_date(0, 1, 1).isoformat()) + self.assertEqual('0000-11-27', util.extended_date(0, 11, 27).isoformat()) + def test_extended_datetime_strftime(self): self.assertEqual('0000-01-01 00:00:00', util.extended_datetime(0, 1, 1).strftime('%Y-%m-%d %H:%M:%S')) self.assertEqual('Sat Saturday Jan January', util.extended_datetime(0, 1, 1).strftime('%a %A %b %B')) self.assertEqual('Tue Tuesday Feb February 29', util.extended_datetime(0, 2, 29).strftime('%a %A %b %B %d')) - if sys.platform == 'win32': + if sys.platform == 'win32' and sys.version_info < (3, 5): self.assertEqual('01/01/00 00:00:00', util.extended_datetime(0, 1, 1).strftime('%c')) else: self.assertEqual('Sat Jan 1 00:00:00 0000', util.extended_datetime(0, 1, 1).strftime('%c')) self.assertEqual('01/01/00', util.extended_datetime(0, 1, 1).strftime('%x')) + self.assertEqual('%Y', util.extended_datetime(0, 1, 1).strftime('%%Y')) + + def test_extended_datetime_replace(self): + zone = util.create_timezone(timedelta(hours=12, minutes=45)) + ext_dt = util.extended_datetime(0, 1, 1, 23, tzinfo=zone) + self.assertEqual(ext_dt.replace(year=2040, minute=59), datetime(2040, 1, 1, 23, 59, tzinfo=zone)) + self.assertEqual(ext_dt.replace(minute=59), util.extended_datetime(0, 1, 1, 23, 59, tzinfo=zone)) + + def test_extended_date_replace(self): + ext_date = util.extended_date(0, 2, 27) + self.assertEqual(ext_date.replace(year=2040), date(2040, 2, 27)) + self.assertEqual(ext_date.replace(day=29), util.extended_date(0, 2, 29)) + with self.assertRaises(ValueError): + ext_date.replace(day=30) + + def test_extended_datetime_encodings(self): + zone = util.create_timezone(timedelta(hours=12, minutes=45)) + + # test with microseconds + ext_dt = util.extended_datetime(0, 2, 29, 9, 17, 45, 14889, zone) + self.assertEqual(str(ext_dt), '0000-02-29 09:17:45.014889+12:45') + if py2: + self.assertEqual(unicode(ext_dt), '0000-02-29 09:17:45.014889+12:45') # noqa: F821 + + # test without microseconds + ext_dt = util.extended_datetime(0, 2, 29, 9, 17, 45, 0, zone) + self.assertEqual(str(ext_dt), '0000-02-29 09:17:45+12:45') + if py2: + self.assertEqual(unicode(ext_dt), '0000-02-29 09:17:45+12:45') # noqa: F821 + + def test_extended_date_encodings(self): + ext_date = util.extended_date(0, 2, 29) + self.assertEqual(str(ext_date), '0000-02-29') + if py2: + self.assertEqual(unicode(ext_date), '0000-02-29') # noqa: F821 + + def test_extended_datetime_timestamp(self): + if sys.version_info >= (3, 3): + zone = util.create_timezone(timedelta(hours=12, minutes=45)) + ext_dt = util.extended_datetime(0, 12, 31, 23, 0, 0, 14889, zone) + dt = datetime(1, 1, 1, 0, 0, 0, 14889, zone) + self.assertTrue(abs(dt.timestamp() - ext_dt.timestamp() - 3600.0) < 0.0000001) def test_extended_date_compare(self): self.assertTrue(util.extended_date(0, 1, 1) < date(1, 1, 1)) @@ -80,6 +254,9 @@ class UtilTests(unittest.TestCase): self.assertTrue(util.extended_date(0, 1, 3) >= util.extended_date(0, 1, 2)) self.assertTrue(util.extended_date(0, 1, 3) > util.extended_date(0, 1, 2)) + with self.assertRaises(TypeError): + util.extended_date(0, 1, 1) < "0000-01-02" + def test_extended_datetime_compare(self): self.assertTrue(util.extended_datetime(0, 1, 1) < datetime(1, 1, 1)) self.assertTrue(util.extended_datetime(0, 1, 1) <= datetime(1, 1, 1)) @@ -108,6 +285,43 @@ class UtilTests(unittest.TestCase): self.assertFalse(util.extended_datetime(0, 1, 3) == util.extended_datetime(0, 1, 2)) self.assertTrue(util.extended_datetime(0, 1, 3) >= util.extended_datetime(0, 1, 2)) self.assertTrue(util.extended_datetime(0, 1, 3) > util.extended_datetime(0, 1, 2)) + self.assertTrue( + util.extended_datetime(0, 12, 31, 21, 4, 5, 6, util.create_timezone(timedelta(hours=-8))) + == datetime(1, 1, 1, 5, 4, 5, 6, utc) + ) + self.assertTrue( + util.extended_datetime(0, 12, 31, 21, 4, 5, 6, util.create_timezone(timedelta(hours=-8))) + == datetime(1, 1, 1, 5, 7, 5, 6, util.create_timezone(timedelta(hours=0, minutes=3))) + ) + self.assertFalse( + util.extended_datetime(0, 12, 31, 21, 4, 5, 6, util.create_timezone(timedelta(hours=-7))) + == datetime(1, 1, 1, 5, 4, 5, 6, utc) + ) + self.assertFalse(util.extended_datetime(0, 1, 1) == util.extended_datetime(0, 1, 1, tzinfo=utc)) + self.assertFalse(util.extended_datetime(0, 1, 1) == "0000-01-01") + + with self.assertRaises(TypeError): + util.extended_datetime(0, 1, 1) < "0000-01-02" + + def test_extended_datetime_arithmetic(self): + zone = util.create_timezone(timedelta(hours=12, minutes=45)) + ext_dt = util.extended_datetime(0, 12, 31, 9, 17, 45, 14889, zone) + self.assertEqual(ext_dt + timedelta(hours=20), datetime(1, 1, 1, 5, 17, 45, 14889, zone)) + self.assertEqual(ext_dt - timedelta(hours=20), util.extended_datetime(0, 12, 30, 13, 17, 45, 14889, zone)) + self.assertEqual(ext_dt - ext_dt, timedelta(0)) + + zone2 = util.create_timezone(timedelta(hours=-8, minutes=-31)) + ext_dt2 = util.extended_datetime(0, 11, 14, 13, 44, 20, 876543, zone2) + expected_diff = timedelta(days=47, hours=-4, minutes=-27, seconds=25, microseconds=-861654) + expected_diff -= timedelta(hours=20, minutes=76) + self.assertEqual(ext_dt - ext_dt2, expected_diff) + + dt = datetime(400, 12, 31, 9, 17, 45, 14889, zone) + self.assertEqual(dt - ext_dt, timedelta(days=util.extended_datetime.DAYS_IN_400_YEARS)) + self.assertEqual(ext_dt - dt, -timedelta(days=util.extended_datetime.DAYS_IN_400_YEARS)) + + with self.assertRaises(TypeError): + ext_dt - "test" def test_extended_datetime_compare_tzinfo(self): with self.assertRaises(TypeError): @@ -122,14 +336,26 @@ class UtilTests(unittest.TestCase): def test_iri_to_uri(self): self.assertEqual( - b'ldap://ldap.e-szigno.hu/CN=Microsec%20e-Szigno%20Root%20CA,OU=e-Szigno%20CA,O=Microsec%20Ltd.,L=Budapest,C=HU?certificateRevocationList;binary', - util.iri_to_uri('ldap://ldap.e-szigno.hu/CN=Microsec e-Szigno Root CA,OU=e-Szigno CA,O=Microsec Ltd.,L=Budapest,C=HU?certificateRevocationList;binary') + b'ldap://ldap.e-szigno.hu/CN=Microsec%20e-Szigno%20Root%20CA,OU=e-Szigno%20CA,' + b'O=Microsec%20Ltd.,L=Budapest,C=HU?certificateRevocationList;binary', + util.iri_to_uri( + 'ldap://ldap.e-szigno.hu/CN=Microsec e-Szigno Root CA,' + 'OU=e-Szigno CA,O=Microsec Ltd.,L=Budapest,C=HU?certificateRevocationList;binary' + ) ) self.assertEqual( - b'ldap://directory.d-trust.net/CN=D-TRUST%20Root%20Class%203%20CA%202%202009,O=D-Trust%20GmbH,C=DE?certificaterevocationlist', - util.iri_to_uri('ldap://directory.d-trust.net/CN=D-TRUST Root Class 3 CA 2 2009,O=D-Trust GmbH,C=DE?certificaterevocationlist') + b'ldap://directory.d-trust.net/CN=D-TRUST%20Root%20Class%203%20CA%202%202009,' + b'O=D-Trust%20GmbH,C=DE?certificaterevocationlist', + util.iri_to_uri( + 'ldap://directory.d-trust.net/CN=D-TRUST Root Class 3 CA 2 2009,' + 'O=D-Trust GmbH,C=DE?certificaterevocationlist' + ) ) self.assertEqual( - b'ldap://directory.d-trust.net/CN=D-TRUST%20Root%20Class%203%20CA%202%20EV%202009,O=D-Trust%20GmbH,C=DE?certificaterevocationlist', - util.iri_to_uri('ldap://directory.d-trust.net/CN=D-TRUST Root Class 3 CA 2 EV 2009,O=D-Trust GmbH,C=DE?certificaterevocationlist') + b'ldap://directory.d-trust.net/CN=D-TRUST%20Root%20Class%203%20CA%202%20EV%202009,' + b'O=D-Trust%20GmbH,C=DE?certificaterevocationlist', + util.iri_to_uri( + 'ldap://directory.d-trust.net/CN=D-TRUST Root Class 3 CA 2 EV 2009,' + 'O=D-Trust GmbH,C=DE?certificaterevocationlist' + ) ) diff --git a/tests/test_x509.py b/tests/test_x509.py index f06ab9b..f933911 100644 --- a/tests/test_x509.py +++ b/tests/test_x509.py @@ -176,6 +176,20 @@ class X509Tests(unittest.TestCase): self.assertEqual('https://example.com', u.__unicode__()) self.assertEqual(b'\x16\x13https://example.com', u.dump()) + def test_uri_no_normalization(self): + u = x509.URI('https://example.com/') + self.assertEqual('https://example.com/', u.native) + self.assertEqual('https://example.com/', u.__unicode__()) + self.assertEqual(b'\x16\x14https://example.com/', u.dump()) + u2 = x509.URI('https://example.com') + self.assertEqual('https://example.com', u2.native) + self.assertEqual('https://example.com', u2.__unicode__()) + self.assertEqual(b'\x16\x13https://example.com', u2.dump()) + u3 = x509.URI('https://example.com:443/') + self.assertEqual('https://example.com:443/', u3.native) + self.assertEqual('https://example.com:443/', u3.__unicode__()) + self.assertEqual(b'\x16\x18https://example.com:443/', u3.dump()) + def test_indef_uri(self): u = x509.URI.load(b'\x36\x80\x16\x07https:/\x16\x07/exampl\x16\x05e.com\x00\x00') self.assertEqual('https://example.com', u.native) @@ -530,6 +544,11 @@ class X509Tests(unittest.TestCase): 'ecdsa', 'sha256' ), + ( + 'keys/test-rsapss.crt', + 'rsassa_pss', + 'sha256' + ), ) @data('signature_algo_info') @@ -1857,7 +1876,7 @@ class X509Tests(unittest.TestCase): b'\x80\x0f20170918151736Z\x81\x0f20180101041421Z' ), ) - + @data('private_key_usage_period_value_info') def private_key_usage_period_value(self, relative_path, private_key_usage_period_value): cert = self._load_cert(relative_path) @@ -3390,3 +3409,13 @@ class X509Tests(unittest.TestCase): '{}', x509.DirectoryString.load(b'\x14\x02{}').native ) + + def test_validity_after_before(self): + cert = self._load_cert("keys/test-validity.crt") + + self.assertEqual(cert.not_valid_after, datetime(2118, 1, 28, 12, 27, 39, tzinfo=util.timezone.utc)) + self.assertEqual(cert.not_valid_before, datetime(2018, 2, 21, 12, 27, 39, tzinfo=util.timezone.utc)) + + def test_invalid_email_encoding(self): + cert = self._load_cert("invalid_email_tag.pem") + self.assertEqual('info@keyweb.de', cert.subject.native['email_address']) diff --git a/tests/unittest_data.py b/tests/unittest_data.py index 5ceac54..a256dd7 100644 --- a/tests/unittest_data.py +++ b/tests/unittest_data.py @@ -44,9 +44,12 @@ def data_decorator(cls): else: data_name = num expanded_name = 'test_%s_%s' % (name, data_name) + # We used expanded variable names here since this line is present in # backtraces that are generated from test failures. - generated_test_function = lambda self: original_function(self, *params) + def generated_test_function(self): + original_function(self, *params) + setattr(cls, expanded_name, generated_test_function) for name in dir(cls): @@ -1,5 +1,5 @@ [tox] -envlist = py26,py27,py32,py33,py34,py35,py36,pypy +envlist = py26,py27,py32,py33,py34,py35,py36,py37,pypy [testenv] deps = -rrequires/ci |