diff options
author | Kelvin Zhang <zhangkelvin@google.com> | 2020-07-29 16:37:51 -0400 |
---|---|---|
committer | Kelvin Zhang <zhangkelvin@google.com> | 2020-08-10 16:22:22 -0400 |
commit | cff4d7606dd195d8c2726867024b310a23a4ac8b (patch) | |
tree | 4926781b6ccd866d95776c7e202ee8314dba7b94 /tools | |
parent | 3443517fcbb245d567d7c87a53852ad4703dde08 (diff) | |
download | platform_build-cff4d7606dd195d8c2726867024b310a23a4ac8b.tar.gz platform_build-cff4d7606dd195d8c2726867024b310a23a4ac8b.tar.bz2 platform_build-cff4d7606dd195d8c2726867024b310a23a4ac8b.zip |
Move non-AB OTA generation code to a separate file
Test: Generate a non-AB OTA, apply it
Change-Id: I2f1afbe70d17356fcbf4d59901d201a76a3d6c4f
Diffstat (limited to 'tools')
-rw-r--r-- | tools/releasetools/Android.bp | 2 | ||||
-rwxr-xr-x | tools/releasetools/check_target_files_vintf.py | 46 | ||||
-rw-r--r-- | tools/releasetools/common.py | 2 | ||||
-rw-r--r-- | tools/releasetools/non_ab_ota.py | 684 | ||||
-rwxr-xr-x | tools/releasetools/ota_from_target_files.py | 1127 | ||||
-rw-r--r-- | tools/releasetools/ota_utils.py | 433 | ||||
-rw-r--r-- | tools/releasetools/test_non_ab_ota.py | 169 | ||||
-rw-r--r-- | tools/releasetools/test_ota_from_target_files.py | 165 | ||||
-rwxr-xr-x | tools/releasetools/test_utils.py | 36 |
9 files changed, 1384 insertions, 1280 deletions
diff --git a/tools/releasetools/Android.bp b/tools/releasetools/Android.bp index 11f92abb17..0ca2b3791c 100644 --- a/tools/releasetools/Android.bp +++ b/tools/releasetools/Android.bp @@ -93,7 +93,9 @@ python_defaults { srcs: [ "edify_generator.py", "ota_from_target_files.py", + "non_ab_ota.py", "target_files_diff.py", + "ota_utils.py", ], libs: [ "releasetools_check_target_files_vintf", diff --git a/tools/releasetools/check_target_files_vintf.py b/tools/releasetools/check_target_files_vintf.py index ef66112355..0edefac9c1 100755 --- a/tools/releasetools/check_target_files_vintf.py +++ b/tools/releasetools/check_target_files_vintf.py @@ -220,6 +220,52 @@ def CheckVintf(inp, info_dict=None): raise ValueError('{} is not a valid directory or zip file'.format(inp)) +def CheckVintfIfTrebleEnabled(target_files, target_info): + """Checks compatibility info of the input target files. + + Metadata used for compatibility verification is retrieved from target_zip. + + Compatibility should only be checked for devices that have enabled + Treble support. + + Args: + target_files: Path to zip file containing the source files to be included + for OTA. Can also be the path to extracted directory. + target_info: The BuildInfo instance that holds the target build info. + """ + + # Will only proceed if the target has enabled the Treble support (as well as + # having a /vendor partition). + if not HasTrebleEnabled(target_files, target_info): + return + + # Skip adding the compatibility package as a workaround for b/114240221. The + # compatibility will always fail on devices without qualified kernels. + if OPTIONS.skip_compatibility_check: + return + + if not CheckVintf(target_files, target_info): + raise RuntimeError("VINTF compatibility check failed") + +def HasTrebleEnabled(target_files, target_info): + def HasVendorPartition(target_files): + if os.path.isdir(target_files): + return os.path.isdir(os.path.join(target_files, "VENDOR")) + if zipfile.is_zipfile(target_files): + return HasPartition(zipfile.ZipFile(target_files), "vendor") + raise ValueError("Unknown target_files argument") + + return (HasVendorPartition(target_files) and + target_info.GetBuildProp("ro.treble.enabled") == "true") + + +def HasPartition(target_files_zip, partition): + try: + target_files_zip.getinfo(partition.upper() + "/") + return True + except KeyError: + return False + def main(argv): args = common.ParseOptions(argv, __doc__) diff --git a/tools/releasetools/common.py b/tools/releasetools/common.py index 1846a67de1..89900d3eea 100644 --- a/tools/releasetools/common.py +++ b/tools/releasetools/common.py @@ -1227,7 +1227,7 @@ def _MakeRamdisk(sourcedir, fs_config_file=None, lz4_ramdisks=False): cmd = ["mkbootfs", os.path.join(sourcedir, "RAMDISK")] p1 = Run(cmd, stdout=subprocess.PIPE) if lz4_ramdisks: - p2 = Run(["lz4", "-l", "-12" , "--favor-decSpeed"], stdin=p1.stdout, + p2 = Run(["lz4", "-l", "-12", "--favor-decSpeed"], stdin=p1.stdout, stdout=ramdisk_img.file.fileno()) else: p2 = Run(["minigzip"], stdin=p1.stdout, stdout=ramdisk_img.file.fileno()) diff --git a/tools/releasetools/non_ab_ota.py b/tools/releasetools/non_ab_ota.py new file mode 100644 index 0000000000..3a8795798b --- /dev/null +++ b/tools/releasetools/non_ab_ota.py @@ -0,0 +1,684 @@ +# Copyright (C) 2020 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import logging +import os +import zipfile + +import common +import edify_generator +import verity_utils +from check_target_files_vintf import CheckVintfIfTrebleEnabled, HasPartition +from common import OPTIONS +from ota_utils import UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata, PropertyFiles + +logger = logging.getLogger(__name__) + + +def GetBlockDifferences(target_zip, source_zip, target_info, source_info, + device_specific): + """Returns a ordered dict of block differences with partition name as key.""" + + def GetIncrementalBlockDifferenceForPartition(name): + if not HasPartition(source_zip, name): + raise RuntimeError( + "can't generate incremental that adds {}".format(name)) + + partition_src = common.GetUserImage(name, OPTIONS.source_tmp, source_zip, + info_dict=source_info, + allow_shared_blocks=allow_shared_blocks) + + hashtree_info_generator = verity_utils.CreateHashtreeInfoGenerator( + name, 4096, target_info) + partition_tgt = common.GetUserImage(name, OPTIONS.target_tmp, target_zip, + info_dict=target_info, + allow_shared_blocks=allow_shared_blocks, + hashtree_info_generator=hashtree_info_generator) + + # Check the first block of the source system partition for remount R/W only + # if the filesystem is ext4. + partition_source_info = source_info["fstab"]["/" + name] + check_first_block = partition_source_info.fs_type == "ext4" + # Disable using imgdiff for squashfs. 'imgdiff -z' expects input files to be + # in zip formats. However with squashfs, a) all files are compressed in LZ4; + # b) the blocks listed in block map may not contain all the bytes for a + # given file (because they're rounded to be 4K-aligned). + partition_target_info = target_info["fstab"]["/" + name] + disable_imgdiff = (partition_source_info.fs_type == "squashfs" or + partition_target_info.fs_type == "squashfs") + return common.BlockDifference(name, partition_tgt, partition_src, + check_first_block, + version=blockimgdiff_version, + disable_imgdiff=disable_imgdiff) + + if source_zip: + # See notes in common.GetUserImage() + allow_shared_blocks = (source_info.get('ext4_share_dup_blocks') == "true" or + target_info.get('ext4_share_dup_blocks') == "true") + blockimgdiff_version = max( + int(i) for i in target_info.get( + "blockimgdiff_versions", "1").split(",")) + assert blockimgdiff_version >= 3 + + block_diff_dict = collections.OrderedDict() + partition_names = ["system", "vendor", "product", "odm", "system_ext", + "vendor_dlkm", "odm_dlkm"] + for partition in partition_names: + if not HasPartition(target_zip, partition): + continue + # Full OTA update. + if not source_zip: + tgt = common.GetUserImage(partition, OPTIONS.input_tmp, target_zip, + info_dict=target_info, + reset_file_map=True) + block_diff_dict[partition] = common.BlockDifference(partition, tgt, + src=None) + # Incremental OTA update. + else: + block_diff_dict[partition] = GetIncrementalBlockDifferenceForPartition( + partition) + assert "system" in block_diff_dict + + # Get the block diffs from the device specific script. If there is a + # duplicate block diff for a partition, ignore the diff in the generic script + # and use the one in the device specific script instead. + if source_zip: + device_specific_diffs = device_specific.IncrementalOTA_GetBlockDifferences() + function_name = "IncrementalOTA_GetBlockDifferences" + else: + device_specific_diffs = device_specific.FullOTA_GetBlockDifferences() + function_name = "FullOTA_GetBlockDifferences" + + if device_specific_diffs: + assert all(isinstance(diff, common.BlockDifference) + for diff in device_specific_diffs), \ + "{} is not returning a list of BlockDifference objects".format( + function_name) + for diff in device_specific_diffs: + if diff.partition in block_diff_dict: + logger.warning("Duplicate block difference found. Device specific block" + " diff for partition '%s' overrides the one in generic" + " script.", diff.partition) + block_diff_dict[diff.partition] = diff + + return block_diff_dict + + +def WriteFullOTAPackage(input_zip, output_file): + target_info = common.BuildInfo(OPTIONS.info_dict, OPTIONS.oem_dicts) + + # We don't know what version it will be installed on top of. We expect the API + # just won't change very often. Similarly for fstab, it might have changed in + # the target build. + target_api_version = target_info["recovery_api_version"] + script = edify_generator.EdifyGenerator(target_api_version, target_info) + + if target_info.oem_props and not OPTIONS.oem_no_mount: + target_info.WriteMountOemScript(script) + + metadata = GetPackageMetadata(target_info) + + if not OPTIONS.no_signing: + staging_file = common.MakeTempFile(suffix='.zip') + else: + staging_file = output_file + + output_zip = zipfile.ZipFile( + staging_file, "w", compression=zipfile.ZIP_DEFLATED) + + device_specific = common.DeviceSpecificParams( + input_zip=input_zip, + input_version=target_api_version, + output_zip=output_zip, + script=script, + input_tmp=OPTIONS.input_tmp, + metadata=metadata, + info_dict=OPTIONS.info_dict) + + assert HasRecoveryPatch(input_zip, info_dict=OPTIONS.info_dict) + + # Assertions (e.g. downgrade check, device properties check). + ts = target_info.GetBuildProp("ro.build.date.utc") + ts_text = target_info.GetBuildProp("ro.build.date") + script.AssertOlderBuild(ts, ts_text) + + target_info.WriteDeviceAssertions(script, OPTIONS.oem_no_mount) + device_specific.FullOTA_Assertions() + + block_diff_dict = GetBlockDifferences(target_zip=input_zip, source_zip=None, + target_info=target_info, + source_info=None, + device_specific=device_specific) + + # Two-step package strategy (in chronological order, which is *not* + # the order in which the generated script has things): + # + # if stage is not "2/3" or "3/3": + # write recovery image to boot partition + # set stage to "2/3" + # reboot to boot partition and restart recovery + # else if stage is "2/3": + # write recovery image to recovery partition + # set stage to "3/3" + # reboot to recovery partition and restart recovery + # else: + # (stage must be "3/3") + # set stage to "" + # do normal full package installation: + # wipe and install system, boot image, etc. + # set up system to update recovery partition on first boot + # complete script normally + # (allow recovery to mark itself finished and reboot) + + recovery_img = common.GetBootableImage("recovery.img", "recovery.img", + OPTIONS.input_tmp, "RECOVERY") + if OPTIONS.two_step: + if not target_info.get("multistage_support"): + assert False, "two-step packages not supported by this build" + fs = target_info["fstab"]["/misc"] + assert fs.fs_type.upper() == "EMMC", \ + "two-step packages only supported on devices with EMMC /misc partitions" + bcb_dev = {"bcb_dev": fs.device} + common.ZipWriteStr(output_zip, "recovery.img", recovery_img.data) + script.AppendExtra(""" +if get_stage("%(bcb_dev)s") == "2/3" then +""" % bcb_dev) + + # Stage 2/3: Write recovery image to /recovery (currently running /boot). + script.Comment("Stage 2/3") + script.WriteRawImage("/recovery", "recovery.img") + script.AppendExtra(""" +set_stage("%(bcb_dev)s", "3/3"); +reboot_now("%(bcb_dev)s", "recovery"); +else if get_stage("%(bcb_dev)s") == "3/3" then +""" % bcb_dev) + + # Stage 3/3: Make changes. + script.Comment("Stage 3/3") + + # Dump fingerprints + script.Print("Target: {}".format(target_info.fingerprint)) + + device_specific.FullOTA_InstallBegin() + + # All other partitions as well as the data wipe use 10% of the progress, and + # the update of the system partition takes the remaining progress. + system_progress = 0.9 - (len(block_diff_dict) - 1) * 0.1 + if OPTIONS.wipe_user_data: + system_progress -= 0.1 + progress_dict = {partition: 0.1 for partition in block_diff_dict} + progress_dict["system"] = system_progress + + if target_info.get('use_dynamic_partitions') == "true": + # Use empty source_info_dict to indicate that all partitions / groups must + # be re-added. + dynamic_partitions_diff = common.DynamicPartitionsDifference( + info_dict=OPTIONS.info_dict, + block_diffs=block_diff_dict.values(), + progress_dict=progress_dict) + dynamic_partitions_diff.WriteScript(script, output_zip, + write_verify_script=OPTIONS.verify) + else: + for block_diff in block_diff_dict.values(): + block_diff.WriteScript(script, output_zip, + progress=progress_dict.get(block_diff.partition), + write_verify_script=OPTIONS.verify) + + CheckVintfIfTrebleEnabled(OPTIONS.input_tmp, target_info) + + boot_img = common.GetBootableImage( + "boot.img", "boot.img", OPTIONS.input_tmp, "BOOT") + common.CheckSize(boot_img.data, "boot.img", target_info) + common.ZipWriteStr(output_zip, "boot.img", boot_img.data) + + script.WriteRawImage("/boot", "boot.img") + + script.ShowProgress(0.1, 10) + device_specific.FullOTA_InstallEnd() + + if OPTIONS.extra_script is not None: + script.AppendExtra(OPTIONS.extra_script) + + script.UnmountAll() + + if OPTIONS.wipe_user_data: + script.ShowProgress(0.1, 10) + script.FormatPartition("/data") + + if OPTIONS.two_step: + script.AppendExtra(""" +set_stage("%(bcb_dev)s", ""); +""" % bcb_dev) + script.AppendExtra("else\n") + + # Stage 1/3: Nothing to verify for full OTA. Write recovery image to /boot. + script.Comment("Stage 1/3") + _WriteRecoveryImageToBoot(script, output_zip) + + script.AppendExtra(""" +set_stage("%(bcb_dev)s", "2/3"); +reboot_now("%(bcb_dev)s", ""); +endif; +endif; +""" % bcb_dev) + + script.SetProgress(1) + script.AddToZip(input_zip, output_zip, input_path=OPTIONS.updater_binary) + metadata["ota-required-cache"] = str(script.required_cache) + + # We haven't written the metadata entry, which will be done in + # FinalizeMetadata. + common.ZipClose(output_zip) + + needed_property_files = ( + NonAbOtaPropertyFiles(), + ) + FinalizeMetadata(metadata, staging_file, output_file, needed_property_files) + + +def WriteBlockIncrementalOTAPackage(target_zip, source_zip, output_file): + target_info = common.BuildInfo(OPTIONS.target_info_dict, OPTIONS.oem_dicts) + source_info = common.BuildInfo(OPTIONS.source_info_dict, OPTIONS.oem_dicts) + + target_api_version = target_info["recovery_api_version"] + source_api_version = source_info["recovery_api_version"] + if source_api_version == 0: + logger.warning( + "Generating edify script for a source that can't install it.") + + script = edify_generator.EdifyGenerator( + source_api_version, target_info, fstab=source_info["fstab"]) + + if target_info.oem_props or source_info.oem_props: + if not OPTIONS.oem_no_mount: + source_info.WriteMountOemScript(script) + + metadata = GetPackageMetadata(target_info, source_info) + + if not OPTIONS.no_signing: + staging_file = common.MakeTempFile(suffix='.zip') + else: + staging_file = output_file + + output_zip = zipfile.ZipFile( + staging_file, "w", compression=zipfile.ZIP_DEFLATED) + + device_specific = common.DeviceSpecificParams( + source_zip=source_zip, + source_version=source_api_version, + source_tmp=OPTIONS.source_tmp, + target_zip=target_zip, + target_version=target_api_version, + target_tmp=OPTIONS.target_tmp, + output_zip=output_zip, + script=script, + metadata=metadata, + info_dict=source_info) + + source_boot = common.GetBootableImage( + "/tmp/boot.img", "boot.img", OPTIONS.source_tmp, "BOOT", source_info) + target_boot = common.GetBootableImage( + "/tmp/boot.img", "boot.img", OPTIONS.target_tmp, "BOOT", target_info) + updating_boot = (not OPTIONS.two_step and + (source_boot.data != target_boot.data)) + + target_recovery = common.GetBootableImage( + "/tmp/recovery.img", "recovery.img", OPTIONS.target_tmp, "RECOVERY") + + block_diff_dict = GetBlockDifferences(target_zip=target_zip, + source_zip=source_zip, + target_info=target_info, + source_info=source_info, + device_specific=device_specific) + + CheckVintfIfTrebleEnabled(OPTIONS.target_tmp, target_info) + + # Assertions (e.g. device properties check). + target_info.WriteDeviceAssertions(script, OPTIONS.oem_no_mount) + device_specific.IncrementalOTA_Assertions() + + # Two-step incremental package strategy (in chronological order, + # which is *not* the order in which the generated script has + # things): + # + # if stage is not "2/3" or "3/3": + # do verification on current system + # write recovery image to boot partition + # set stage to "2/3" + # reboot to boot partition and restart recovery + # else if stage is "2/3": + # write recovery image to recovery partition + # set stage to "3/3" + # reboot to recovery partition and restart recovery + # else: + # (stage must be "3/3") + # perform update: + # patch system files, etc. + # force full install of new boot image + # set up system to update recovery partition on first boot + # complete script normally + # (allow recovery to mark itself finished and reboot) + + if OPTIONS.two_step: + if not source_info.get("multistage_support"): + assert False, "two-step packages not supported by this build" + fs = source_info["fstab"]["/misc"] + assert fs.fs_type.upper() == "EMMC", \ + "two-step packages only supported on devices with EMMC /misc partitions" + bcb_dev = {"bcb_dev": fs.device} + common.ZipWriteStr(output_zip, "recovery.img", target_recovery.data) + script.AppendExtra(""" +if get_stage("%(bcb_dev)s") == "2/3" then +""" % bcb_dev) + + # Stage 2/3: Write recovery image to /recovery (currently running /boot). + script.Comment("Stage 2/3") + script.AppendExtra("sleep(20);\n") + script.WriteRawImage("/recovery", "recovery.img") + script.AppendExtra(""" +set_stage("%(bcb_dev)s", "3/3"); +reboot_now("%(bcb_dev)s", "recovery"); +else if get_stage("%(bcb_dev)s") != "3/3" then +""" % bcb_dev) + + # Stage 1/3: (a) Verify the current system. + script.Comment("Stage 1/3") + + # Dump fingerprints + script.Print("Source: {}".format(source_info.fingerprint)) + script.Print("Target: {}".format(target_info.fingerprint)) + + script.Print("Verifying current system...") + + device_specific.IncrementalOTA_VerifyBegin() + + WriteFingerprintAssertion(script, target_info, source_info) + + # Check the required cache size (i.e. stashed blocks). + required_cache_sizes = [diff.required_cache for diff in + block_diff_dict.values()] + if updating_boot: + boot_type, boot_device_expr = common.GetTypeAndDeviceExpr("/boot", + source_info) + d = common.Difference(target_boot, source_boot) + _, _, d = d.ComputePatch() + if d is None: + include_full_boot = True + common.ZipWriteStr(output_zip, "boot.img", target_boot.data) + else: + include_full_boot = False + + logger.info( + "boot target: %d source: %d diff: %d", target_boot.size, + source_boot.size, len(d)) + + common.ZipWriteStr(output_zip, "boot.img.p", d) + + target_expr = 'concat("{}:",{},":{}:{}")'.format( + boot_type, boot_device_expr, target_boot.size, target_boot.sha1) + source_expr = 'concat("{}:",{},":{}:{}")'.format( + boot_type, boot_device_expr, source_boot.size, source_boot.sha1) + script.PatchPartitionExprCheck(target_expr, source_expr) + + required_cache_sizes.append(target_boot.size) + + if required_cache_sizes: + script.CacheFreeSpaceCheck(max(required_cache_sizes)) + + # Verify the existing partitions. + for diff in block_diff_dict.values(): + diff.WriteVerifyScript(script, touched_blocks_only=True) + + device_specific.IncrementalOTA_VerifyEnd() + + if OPTIONS.two_step: + # Stage 1/3: (b) Write recovery image to /boot. + _WriteRecoveryImageToBoot(script, output_zip) + + script.AppendExtra(""" +set_stage("%(bcb_dev)s", "2/3"); +reboot_now("%(bcb_dev)s", ""); +else +""" % bcb_dev) + + # Stage 3/3: Make changes. + script.Comment("Stage 3/3") + + script.Comment("---- start making changes here ----") + + device_specific.IncrementalOTA_InstallBegin() + + progress_dict = {partition: 0.1 for partition in block_diff_dict} + progress_dict["system"] = 1 - len(block_diff_dict) * 0.1 + + if OPTIONS.source_info_dict.get("use_dynamic_partitions") == "true": + if OPTIONS.target_info_dict.get("use_dynamic_partitions") != "true": + raise RuntimeError( + "can't generate incremental that disables dynamic partitions") + dynamic_partitions_diff = common.DynamicPartitionsDifference( + info_dict=OPTIONS.target_info_dict, + source_info_dict=OPTIONS.source_info_dict, + block_diffs=block_diff_dict.values(), + progress_dict=progress_dict) + dynamic_partitions_diff.WriteScript( + script, output_zip, write_verify_script=OPTIONS.verify) + else: + for block_diff in block_diff_dict.values(): + block_diff.WriteScript(script, output_zip, + progress=progress_dict.get(block_diff.partition), + write_verify_script=OPTIONS.verify) + + if OPTIONS.two_step: + common.ZipWriteStr(output_zip, "boot.img", target_boot.data) + script.WriteRawImage("/boot", "boot.img") + logger.info("writing full boot image (forced by two-step mode)") + + if not OPTIONS.two_step: + if updating_boot: + if include_full_boot: + logger.info("boot image changed; including full.") + script.Print("Installing boot image...") + script.WriteRawImage("/boot", "boot.img") + else: + # Produce the boot image by applying a patch to the current + # contents of the boot partition, and write it back to the + # partition. + logger.info("boot image changed; including patch.") + script.Print("Patching boot image...") + script.ShowProgress(0.1, 10) + target_expr = 'concat("{}:",{},":{}:{}")'.format( + boot_type, boot_device_expr, target_boot.size, target_boot.sha1) + source_expr = 'concat("{}:",{},":{}:{}")'.format( + boot_type, boot_device_expr, source_boot.size, source_boot.sha1) + script.PatchPartitionExpr(target_expr, source_expr, '"boot.img.p"') + else: + logger.info("boot image unchanged; skipping.") + + # Do device-specific installation (eg, write radio image). + device_specific.IncrementalOTA_InstallEnd() + + if OPTIONS.extra_script is not None: + script.AppendExtra(OPTIONS.extra_script) + + if OPTIONS.wipe_user_data: + script.Print("Erasing user data...") + script.FormatPartition("/data") + + if OPTIONS.two_step: + script.AppendExtra(""" +set_stage("%(bcb_dev)s", ""); +endif; +endif; +""" % bcb_dev) + + script.SetProgress(1) + # For downgrade OTAs, we prefer to use the update-binary in the source + # build that is actually newer than the one in the target build. + if OPTIONS.downgrade: + script.AddToZip(source_zip, output_zip, input_path=OPTIONS.updater_binary) + else: + script.AddToZip(target_zip, output_zip, input_path=OPTIONS.updater_binary) + metadata["ota-required-cache"] = str(script.required_cache) + + # We haven't written the metadata entry yet, which will be handled in + # FinalizeMetadata(). + common.ZipClose(output_zip) + + # Sign the generated zip package unless no_signing is specified. + needed_property_files = ( + NonAbOtaPropertyFiles(), + ) + FinalizeMetadata(metadata, staging_file, output_file, needed_property_files) + + +def GenerateNonAbOtaPackage(target_file, output_file, source_file=None): + """Generates a non-A/B OTA package.""" + # Check the loaded info dicts first. + if OPTIONS.info_dict.get("no_recovery") == "true": + raise common.ExternalError( + "--- target build has specified no recovery ---") + + # Non-A/B OTAs rely on /cache partition to store temporary files. + cache_size = OPTIONS.info_dict.get("cache_size") + if cache_size is None: + logger.warning("--- can't determine the cache partition size ---") + OPTIONS.cache_size = cache_size + + if OPTIONS.extra_script is not None: + with open(OPTIONS.extra_script) as fp: + OPTIONS.extra_script = fp.read() + + if OPTIONS.extracted_input is not None: + OPTIONS.input_tmp = OPTIONS.extracted_input + else: + logger.info("unzipping target target-files...") + OPTIONS.input_tmp = common.UnzipTemp(target_file, UNZIP_PATTERN) + OPTIONS.target_tmp = OPTIONS.input_tmp + + # If the caller explicitly specified the device-specific extensions path via + # -s / --device_specific, use that. Otherwise, use META/releasetools.py if it + # is present in the target target_files. Otherwise, take the path of the file + # from 'tool_extensions' in the info dict and look for that in the local + # filesystem, relative to the current directory. + if OPTIONS.device_specific is None: + from_input = os.path.join(OPTIONS.input_tmp, "META", "releasetools.py") + if os.path.exists(from_input): + logger.info("(using device-specific extensions from target_files)") + OPTIONS.device_specific = from_input + else: + OPTIONS.device_specific = OPTIONS.info_dict.get("tool_extensions") + + if OPTIONS.device_specific is not None: + OPTIONS.device_specific = os.path.abspath(OPTIONS.device_specific) + + # Generate a full OTA. + if source_file is None: + with zipfile.ZipFile(target_file) as input_zip: + WriteFullOTAPackage( + input_zip, + output_file) + + # Generate an incremental OTA. + else: + logger.info("unzipping source target-files...") + OPTIONS.source_tmp = common.UnzipTemp( + OPTIONS.incremental_source, UNZIP_PATTERN) + with zipfile.ZipFile(target_file) as input_zip, \ + zipfile.ZipFile(source_file) as source_zip: + WriteBlockIncrementalOTAPackage( + input_zip, + source_zip, + output_file) + + +def WriteFingerprintAssertion(script, target_info, source_info): + source_oem_props = source_info.oem_props + target_oem_props = target_info.oem_props + + if source_oem_props is None and target_oem_props is None: + script.AssertSomeFingerprint( + source_info.fingerprint, target_info.fingerprint) + elif source_oem_props is not None and target_oem_props is not None: + script.AssertSomeThumbprint( + target_info.GetBuildProp("ro.build.thumbprint"), + source_info.GetBuildProp("ro.build.thumbprint")) + elif source_oem_props is None and target_oem_props is not None: + script.AssertFingerprintOrThumbprint( + source_info.fingerprint, + target_info.GetBuildProp("ro.build.thumbprint")) + else: + script.AssertFingerprintOrThumbprint( + target_info.fingerprint, + source_info.GetBuildProp("ro.build.thumbprint")) + + +class NonAbOtaPropertyFiles(PropertyFiles): + """The property-files for non-A/B OTA. + + For non-A/B OTA, the property-files string contains the info for METADATA + entry, with which a system updater can be fetched the package metadata prior + to downloading the entire package. + """ + + def __init__(self): + super(NonAbOtaPropertyFiles, self).__init__() + self.name = 'ota-property-files' + + +def _WriteRecoveryImageToBoot(script, output_zip): + """Find and write recovery image to /boot in two-step OTA. + + In two-step OTAs, we write recovery image to /boot as the first step so that + we can reboot to there and install a new recovery image to /recovery. + A special "recovery-two-step.img" will be preferred, which encodes the correct + path of "/boot". Otherwise the device may show "device is corrupt" message + when booting into /boot. + + Fall back to using the regular recovery.img if the two-step recovery image + doesn't exist. Note that rebuilding the special image at this point may be + infeasible, because we don't have the desired boot signer and keys when + calling ota_from_target_files.py. + """ + + recovery_two_step_img_name = "recovery-two-step.img" + recovery_two_step_img_path = os.path.join( + OPTIONS.input_tmp, "OTA", recovery_two_step_img_name) + if os.path.exists(recovery_two_step_img_path): + common.ZipWrite( + output_zip, + recovery_two_step_img_path, + arcname=recovery_two_step_img_name) + logger.info( + "two-step package: using %s in stage 1/3", recovery_two_step_img_name) + script.WriteRawImage("/boot", recovery_two_step_img_name) + else: + logger.info("two-step package: using recovery.img in stage 1/3") + # The "recovery.img" entry has been written into package earlier. + script.WriteRawImage("/boot", "recovery.img") + + +def HasRecoveryPatch(target_files_zip, info_dict): + board_uses_vendorimage = info_dict.get("board_uses_vendorimage") == "true" + + if board_uses_vendorimage: + target_files_dir = "VENDOR" + else: + target_files_dir = "SYSTEM/vendor" + + patch = "%s/recovery-from-boot.p" % target_files_dir + img = "%s/etc/recovery.img" % target_files_dir + + namelist = target_files_zip.namelist() + return patch in namelist or img in namelist diff --git a/tools/releasetools/ota_from_target_files.py b/tools/releasetools/ota_from_target_files.py index b70044e3f8..f42974f6a5 100755 --- a/tools/releasetools/ota_from_target_files.py +++ b/tools/releasetools/ota_from_target_files.py @@ -206,9 +206,6 @@ A/B OTA specific options from __future__ import print_function -import collections -import copy -import itertools import logging import multiprocessing import os.path @@ -218,12 +215,12 @@ import struct import sys import zipfile -import check_target_files_vintf import common -import edify_generator import target_files_diff -import verity_utils - +from check_target_files_vintf import CheckVintfIfTrebleEnabled +from non_ab_ota import GenerateNonAbOtaPackage +from ota_utils import (UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata, + PropertyFiles) if sys.hexversion < 0x02070000: print("Python 2.7 or newer is required.", file=sys.stderr) @@ -270,11 +267,10 @@ OPTIONS.force_non_ab = False OPTIONS.boot_variable_file = None -METADATA_NAME = 'META-INF/com/android/metadata' POSTINSTALL_CONFIG = 'META/postinstall_config.txt' DYNAMIC_PARTITION_INFO = 'META/dynamic_partitions_info.txt' AB_PARTITIONS = 'META/ab_partitions.txt' -UNZIP_PATTERN = ['IMAGES/*', 'META/*', 'OTA/*', 'RADIO/*'] + # Files to be unzipped for target diffing purpose. TARGET_DIFFING_UNZIP_PATTERN = ['BOOT', 'RECOVERY', 'SYSTEM/*', 'VENDOR/*', 'PRODUCT/*', 'SYSTEM_EXT/*', 'ODM/*', @@ -488,13 +484,6 @@ class Payload(object): compress_type=zipfile.ZIP_STORED) -def SignOutput(temp_zip_name, output_zip_name): - pw = OPTIONS.key_passwords[OPTIONS.package_key] - - common.SignFile(temp_zip_name, output_zip_name, OPTIONS.package_key, pw, - whole_file=True) - - def _LoadOemDicts(oem_source): """Returns the list of loaded OEM properties dict.""" if not oem_source: @@ -507,658 +496,6 @@ def _LoadOemDicts(oem_source): return oem_dicts -def _WriteRecoveryImageToBoot(script, output_zip): - """Find and write recovery image to /boot in two-step OTA. - - In two-step OTAs, we write recovery image to /boot as the first step so that - we can reboot to there and install a new recovery image to /recovery. - A special "recovery-two-step.img" will be preferred, which encodes the correct - path of "/boot". Otherwise the device may show "device is corrupt" message - when booting into /boot. - - Fall back to using the regular recovery.img if the two-step recovery image - doesn't exist. Note that rebuilding the special image at this point may be - infeasible, because we don't have the desired boot signer and keys when - calling ota_from_target_files.py. - """ - - recovery_two_step_img_name = "recovery-two-step.img" - recovery_two_step_img_path = os.path.join( - OPTIONS.input_tmp, "OTA", recovery_two_step_img_name) - if os.path.exists(recovery_two_step_img_path): - common.ZipWrite( - output_zip, - recovery_two_step_img_path, - arcname=recovery_two_step_img_name) - logger.info( - "two-step package: using %s in stage 1/3", recovery_two_step_img_name) - script.WriteRawImage("/boot", recovery_two_step_img_name) - else: - logger.info("two-step package: using recovery.img in stage 1/3") - # The "recovery.img" entry has been written into package earlier. - script.WriteRawImage("/boot", "recovery.img") - - -def HasRecoveryPatch(target_files_zip, info_dict): - board_uses_vendorimage = info_dict.get("board_uses_vendorimage") == "true" - - if board_uses_vendorimage: - target_files_dir = "VENDOR" - else: - target_files_dir = "SYSTEM/vendor" - - patch = "%s/recovery-from-boot.p" % target_files_dir - img = "%s/etc/recovery.img" % target_files_dir - - namelist = target_files_zip.namelist() - return patch in namelist or img in namelist - - -def HasPartition(target_files_zip, partition): - try: - target_files_zip.getinfo(partition.upper() + "/") - return True - except KeyError: - return False - - -def HasTrebleEnabled(target_files, target_info): - def HasVendorPartition(target_files): - if os.path.isdir(target_files): - return os.path.isdir(os.path.join(target_files, "VENDOR")) - if zipfile.is_zipfile(target_files): - return HasPartition(zipfile.ZipFile(target_files), "vendor") - raise ValueError("Unknown target_files argument") - - return (HasVendorPartition(target_files) and - target_info.GetBuildProp("ro.treble.enabled") == "true") - - -def WriteFingerprintAssertion(script, target_info, source_info): - source_oem_props = source_info.oem_props - target_oem_props = target_info.oem_props - - if source_oem_props is None and target_oem_props is None: - script.AssertSomeFingerprint( - source_info.fingerprint, target_info.fingerprint) - elif source_oem_props is not None and target_oem_props is not None: - script.AssertSomeThumbprint( - target_info.GetBuildProp("ro.build.thumbprint"), - source_info.GetBuildProp("ro.build.thumbprint")) - elif source_oem_props is None and target_oem_props is not None: - script.AssertFingerprintOrThumbprint( - source_info.fingerprint, - target_info.GetBuildProp("ro.build.thumbprint")) - else: - script.AssertFingerprintOrThumbprint( - target_info.fingerprint, - source_info.GetBuildProp("ro.build.thumbprint")) - - -def CheckVintfIfTrebleEnabled(target_files, target_info): - """Checks compatibility info of the input target files. - - Metadata used for compatibility verification is retrieved from target_zip. - - Compatibility should only be checked for devices that have enabled - Treble support. - - Args: - target_files: Path to zip file containing the source files to be included - for OTA. Can also be the path to extracted directory. - target_info: The BuildInfo instance that holds the target build info. - """ - - # Will only proceed if the target has enabled the Treble support (as well as - # having a /vendor partition). - if not HasTrebleEnabled(target_files, target_info): - return - - # Skip adding the compatibility package as a workaround for b/114240221. The - # compatibility will always fail on devices without qualified kernels. - if OPTIONS.skip_compatibility_check: - return - - if not check_target_files_vintf.CheckVintf(target_files, target_info): - raise RuntimeError("VINTF compatibility check failed") - - -def GetBlockDifferences(target_zip, source_zip, target_info, source_info, - device_specific): - """Returns a ordered dict of block differences with partition name as key.""" - - def GetIncrementalBlockDifferenceForPartition(name): - if not HasPartition(source_zip, name): - raise RuntimeError( - "can't generate incremental that adds {}".format(name)) - - partition_src = common.GetUserImage(name, OPTIONS.source_tmp, source_zip, - info_dict=source_info, - allow_shared_blocks=allow_shared_blocks) - - hashtree_info_generator = verity_utils.CreateHashtreeInfoGenerator( - name, 4096, target_info) - partition_tgt = common.GetUserImage(name, OPTIONS.target_tmp, target_zip, - info_dict=target_info, - allow_shared_blocks=allow_shared_blocks, - hashtree_info_generator=hashtree_info_generator) - - # Check the first block of the source system partition for remount R/W only - # if the filesystem is ext4. - partition_source_info = source_info["fstab"]["/" + name] - check_first_block = partition_source_info.fs_type == "ext4" - # Disable using imgdiff for squashfs. 'imgdiff -z' expects input files to be - # in zip formats. However with squashfs, a) all files are compressed in LZ4; - # b) the blocks listed in block map may not contain all the bytes for a - # given file (because they're rounded to be 4K-aligned). - partition_target_info = target_info["fstab"]["/" + name] - disable_imgdiff = (partition_source_info.fs_type == "squashfs" or - partition_target_info.fs_type == "squashfs") - return common.BlockDifference(name, partition_tgt, partition_src, - check_first_block, - version=blockimgdiff_version, - disable_imgdiff=disable_imgdiff) - - if source_zip: - # See notes in common.GetUserImage() - allow_shared_blocks = (source_info.get('ext4_share_dup_blocks') == "true" or - target_info.get('ext4_share_dup_blocks') == "true") - blockimgdiff_version = max( - int(i) for i in target_info.get( - "blockimgdiff_versions", "1").split(",")) - assert blockimgdiff_version >= 3 - - block_diff_dict = collections.OrderedDict() - partition_names = ["system", "vendor", "product", "odm", "system_ext", - "vendor_dlkm", "odm_dlkm"] - for partition in partition_names: - if not HasPartition(target_zip, partition): - continue - # Full OTA update. - if not source_zip: - tgt = common.GetUserImage(partition, OPTIONS.input_tmp, target_zip, - info_dict=target_info, - reset_file_map=True) - block_diff_dict[partition] = common.BlockDifference(partition, tgt, - src=None) - # Incremental OTA update. - else: - block_diff_dict[partition] = GetIncrementalBlockDifferenceForPartition( - partition) - assert "system" in block_diff_dict - - # Get the block diffs from the device specific script. If there is a - # duplicate block diff for a partition, ignore the diff in the generic script - # and use the one in the device specific script instead. - if source_zip: - device_specific_diffs = device_specific.IncrementalOTA_GetBlockDifferences() - function_name = "IncrementalOTA_GetBlockDifferences" - else: - device_specific_diffs = device_specific.FullOTA_GetBlockDifferences() - function_name = "FullOTA_GetBlockDifferences" - - if device_specific_diffs: - assert all(isinstance(diff, common.BlockDifference) - for diff in device_specific_diffs), \ - "{} is not returning a list of BlockDifference objects".format( - function_name) - for diff in device_specific_diffs: - if diff.partition in block_diff_dict: - logger.warning("Duplicate block difference found. Device specific block" - " diff for partition '%s' overrides the one in generic" - " script.", diff.partition) - block_diff_dict[diff.partition] = diff - - return block_diff_dict - - -def WriteFullOTAPackage(input_zip, output_file): - target_info = common.BuildInfo(OPTIONS.info_dict, OPTIONS.oem_dicts) - - # We don't know what version it will be installed on top of. We expect the API - # just won't change very often. Similarly for fstab, it might have changed in - # the target build. - target_api_version = target_info["recovery_api_version"] - script = edify_generator.EdifyGenerator(target_api_version, target_info) - - if target_info.oem_props and not OPTIONS.oem_no_mount: - target_info.WriteMountOemScript(script) - - metadata = GetPackageMetadata(target_info) - - if not OPTIONS.no_signing: - staging_file = common.MakeTempFile(suffix='.zip') - else: - staging_file = output_file - - output_zip = zipfile.ZipFile( - staging_file, "w", compression=zipfile.ZIP_DEFLATED) - - device_specific = common.DeviceSpecificParams( - input_zip=input_zip, - input_version=target_api_version, - output_zip=output_zip, - script=script, - input_tmp=OPTIONS.input_tmp, - metadata=metadata, - info_dict=OPTIONS.info_dict) - - assert HasRecoveryPatch(input_zip, info_dict=OPTIONS.info_dict) - - # Assertions (e.g. downgrade check, device properties check). - ts = target_info.GetBuildProp("ro.build.date.utc") - ts_text = target_info.GetBuildProp("ro.build.date") - script.AssertOlderBuild(ts, ts_text) - - target_info.WriteDeviceAssertions(script, OPTIONS.oem_no_mount) - device_specific.FullOTA_Assertions() - - block_diff_dict = GetBlockDifferences(target_zip=input_zip, source_zip=None, - target_info=target_info, - source_info=None, - device_specific=device_specific) - - # Two-step package strategy (in chronological order, which is *not* - # the order in which the generated script has things): - # - # if stage is not "2/3" or "3/3": - # write recovery image to boot partition - # set stage to "2/3" - # reboot to boot partition and restart recovery - # else if stage is "2/3": - # write recovery image to recovery partition - # set stage to "3/3" - # reboot to recovery partition and restart recovery - # else: - # (stage must be "3/3") - # set stage to "" - # do normal full package installation: - # wipe and install system, boot image, etc. - # set up system to update recovery partition on first boot - # complete script normally - # (allow recovery to mark itself finished and reboot) - - recovery_img = common.GetBootableImage("recovery.img", "recovery.img", - OPTIONS.input_tmp, "RECOVERY") - if OPTIONS.two_step: - if not target_info.get("multistage_support"): - assert False, "two-step packages not supported by this build" - fs = target_info["fstab"]["/misc"] - assert fs.fs_type.upper() == "EMMC", \ - "two-step packages only supported on devices with EMMC /misc partitions" - bcb_dev = {"bcb_dev": fs.device} - common.ZipWriteStr(output_zip, "recovery.img", recovery_img.data) - script.AppendExtra(""" -if get_stage("%(bcb_dev)s") == "2/3" then -""" % bcb_dev) - - # Stage 2/3: Write recovery image to /recovery (currently running /boot). - script.Comment("Stage 2/3") - script.WriteRawImage("/recovery", "recovery.img") - script.AppendExtra(""" -set_stage("%(bcb_dev)s", "3/3"); -reboot_now("%(bcb_dev)s", "recovery"); -else if get_stage("%(bcb_dev)s") == "3/3" then -""" % bcb_dev) - - # Stage 3/3: Make changes. - script.Comment("Stage 3/3") - - # Dump fingerprints - script.Print("Target: {}".format(target_info.fingerprint)) - - device_specific.FullOTA_InstallBegin() - - # All other partitions as well as the data wipe use 10% of the progress, and - # the update of the system partition takes the remaining progress. - system_progress = 0.9 - (len(block_diff_dict) - 1) * 0.1 - if OPTIONS.wipe_user_data: - system_progress -= 0.1 - progress_dict = {partition: 0.1 for partition in block_diff_dict} - progress_dict["system"] = system_progress - - if target_info.get('use_dynamic_partitions') == "true": - # Use empty source_info_dict to indicate that all partitions / groups must - # be re-added. - dynamic_partitions_diff = common.DynamicPartitionsDifference( - info_dict=OPTIONS.info_dict, - block_diffs=block_diff_dict.values(), - progress_dict=progress_dict) - dynamic_partitions_diff.WriteScript(script, output_zip, - write_verify_script=OPTIONS.verify) - else: - for block_diff in block_diff_dict.values(): - block_diff.WriteScript(script, output_zip, - progress=progress_dict.get(block_diff.partition), - write_verify_script=OPTIONS.verify) - - CheckVintfIfTrebleEnabled(OPTIONS.input_tmp, target_info) - - boot_img = common.GetBootableImage( - "boot.img", "boot.img", OPTIONS.input_tmp, "BOOT") - common.CheckSize(boot_img.data, "boot.img", target_info) - common.ZipWriteStr(output_zip, "boot.img", boot_img.data) - - script.WriteRawImage("/boot", "boot.img") - - script.ShowProgress(0.1, 10) - device_specific.FullOTA_InstallEnd() - - if OPTIONS.extra_script is not None: - script.AppendExtra(OPTIONS.extra_script) - - script.UnmountAll() - - if OPTIONS.wipe_user_data: - script.ShowProgress(0.1, 10) - script.FormatPartition("/data") - - if OPTIONS.two_step: - script.AppendExtra(""" -set_stage("%(bcb_dev)s", ""); -""" % bcb_dev) - script.AppendExtra("else\n") - - # Stage 1/3: Nothing to verify for full OTA. Write recovery image to /boot. - script.Comment("Stage 1/3") - _WriteRecoveryImageToBoot(script, output_zip) - - script.AppendExtra(""" -set_stage("%(bcb_dev)s", "2/3"); -reboot_now("%(bcb_dev)s", ""); -endif; -endif; -""" % bcb_dev) - - script.SetProgress(1) - script.AddToZip(input_zip, output_zip, input_path=OPTIONS.updater_binary) - metadata["ota-required-cache"] = str(script.required_cache) - - # We haven't written the metadata entry, which will be done in - # FinalizeMetadata. - common.ZipClose(output_zip) - - needed_property_files = ( - NonAbOtaPropertyFiles(), - ) - FinalizeMetadata(metadata, staging_file, output_file, needed_property_files) - - -def WriteMetadata(metadata, output): - """Writes the metadata to the zip archive or a file. - - Args: - metadata: The metadata dict for the package. - output: A ZipFile object or a string of the output file path. - """ - - value = "".join(["%s=%s\n" % kv for kv in sorted(metadata.items())]) - if isinstance(output, zipfile.ZipFile): - common.ZipWriteStr(output, METADATA_NAME, value, - compress_type=zipfile.ZIP_STORED) - return - - with open(output, 'w') as f: - f.write(value) - - -def HandleDowngradeMetadata(metadata, target_info, source_info): - # Only incremental OTAs are allowed to reach here. - assert OPTIONS.incremental_source is not None - - post_timestamp = target_info.GetBuildProp("ro.build.date.utc") - pre_timestamp = source_info.GetBuildProp("ro.build.date.utc") - is_downgrade = int(post_timestamp) < int(pre_timestamp) - - if OPTIONS.downgrade: - if not is_downgrade: - raise RuntimeError( - "--downgrade or --override_timestamp specified but no downgrade " - "detected: pre: %s, post: %s" % (pre_timestamp, post_timestamp)) - metadata["ota-downgrade"] = "yes" - else: - if is_downgrade: - raise RuntimeError( - "Downgrade detected based on timestamp check: pre: %s, post: %s. " - "Need to specify --override_timestamp OR --downgrade to allow " - "building the incremental." % (pre_timestamp, post_timestamp)) - - -def GetPackageMetadata(target_info, source_info=None): - """Generates and returns the metadata dict. - - It generates a dict() that contains the info to be written into an OTA - package (META-INF/com/android/metadata). It also handles the detection of - downgrade / data wipe based on the global options. - - Args: - target_info: The BuildInfo instance that holds the target build info. - source_info: The BuildInfo instance that holds the source build info, or - None if generating full OTA. - - Returns: - A dict to be written into package metadata entry. - """ - assert isinstance(target_info, common.BuildInfo) - assert source_info is None or isinstance(source_info, common.BuildInfo) - - separator = '|' - - boot_variable_values = {} - if OPTIONS.boot_variable_file: - d = common.LoadDictionaryFromFile(OPTIONS.boot_variable_file) - for key, values in d.items(): - boot_variable_values[key] = [val.strip() for val in values.split(',')] - - post_build_devices, post_build_fingerprints = \ - CalculateRuntimeDevicesAndFingerprints(target_info, boot_variable_values) - metadata = { - 'post-build': separator.join(sorted(post_build_fingerprints)), - 'post-build-incremental': target_info.GetBuildProp( - 'ro.build.version.incremental'), - 'post-sdk-level': target_info.GetBuildProp( - 'ro.build.version.sdk'), - 'post-security-patch-level': target_info.GetBuildProp( - 'ro.build.version.security_patch'), - } - - if target_info.is_ab and not OPTIONS.force_non_ab: - metadata['ota-type'] = 'AB' - metadata['ota-required-cache'] = '0' - else: - metadata['ota-type'] = 'BLOCK' - - if OPTIONS.wipe_user_data: - metadata['ota-wipe'] = 'yes' - - if OPTIONS.retrofit_dynamic_partitions: - metadata['ota-retrofit-dynamic-partitions'] = 'yes' - - is_incremental = source_info is not None - if is_incremental: - pre_build_devices, pre_build_fingerprints = \ - CalculateRuntimeDevicesAndFingerprints(source_info, - boot_variable_values) - metadata['pre-build'] = separator.join(sorted(pre_build_fingerprints)) - metadata['pre-build-incremental'] = source_info.GetBuildProp( - 'ro.build.version.incremental') - metadata['pre-device'] = separator.join(sorted(pre_build_devices)) - else: - metadata['pre-device'] = separator.join(sorted(post_build_devices)) - - # Use the actual post-timestamp, even for a downgrade case. - metadata['post-timestamp'] = target_info.GetBuildProp('ro.build.date.utc') - - # Detect downgrades and set up downgrade flags accordingly. - if is_incremental: - HandleDowngradeMetadata(metadata, target_info, source_info) - - return metadata - - -class PropertyFiles(object): - """A class that computes the property-files string for an OTA package. - - A property-files string is a comma-separated string that contains the - offset/size info for an OTA package. The entries, which must be ZIP_STORED, - can be fetched directly with the package URL along with the offset/size info. - These strings can be used for streaming A/B OTAs, or allowing an updater to - download package metadata entry directly, without paying the cost of - downloading entire package. - - Computing the final property-files string requires two passes. Because doing - the whole package signing (with signapk.jar) will possibly reorder the ZIP - entries, which may in turn invalidate earlier computed ZIP entry offset/size - values. - - This class provides functions to be called for each pass. The general flow is - as follows. - - property_files = PropertyFiles() - # The first pass, which writes placeholders before doing initial signing. - property_files.Compute() - SignOutput() - - # The second pass, by replacing the placeholders with actual data. - property_files.Finalize() - SignOutput() - - And the caller can additionally verify the final result. - - property_files.Verify() - """ - - def __init__(self): - self.name = None - self.required = () - self.optional = () - - def Compute(self, input_zip): - """Computes and returns a property-files string with placeholders. - - We reserve extra space for the offset and size of the metadata entry itself, - although we don't know the final values until the package gets signed. - - Args: - input_zip: The input ZIP file. - - Returns: - A string with placeholders for the metadata offset/size info, e.g. - "payload.bin:679:343,payload_properties.txt:378:45,metadata: ". - """ - return self.GetPropertyFilesString(input_zip, reserve_space=True) - - class InsufficientSpaceException(Exception): - pass - - def Finalize(self, input_zip, reserved_length): - """Finalizes a property-files string with actual METADATA offset/size info. - - The input ZIP file has been signed, with the ZIP entries in the desired - place (signapk.jar will possibly reorder the ZIP entries). Now we compute - the ZIP entry offsets and construct the property-files string with actual - data. Note that during this process, we must pad the property-files string - to the reserved length, so that the METADATA entry size remains the same. - Otherwise the entries' offsets and sizes may change again. - - Args: - input_zip: The input ZIP file. - reserved_length: The reserved length of the property-files string during - the call to Compute(). The final string must be no more than this - size. - - Returns: - A property-files string including the metadata offset/size info, e.g. - "payload.bin:679:343,payload_properties.txt:378:45,metadata:69:379 ". - - Raises: - InsufficientSpaceException: If the reserved length is insufficient to hold - the final string. - """ - result = self.GetPropertyFilesString(input_zip, reserve_space=False) - if len(result) > reserved_length: - raise self.InsufficientSpaceException( - 'Insufficient reserved space: reserved={}, actual={}'.format( - reserved_length, len(result))) - - result += ' ' * (reserved_length - len(result)) - return result - - def Verify(self, input_zip, expected): - """Verifies the input ZIP file contains the expected property-files string. - - Args: - input_zip: The input ZIP file. - expected: The property-files string that's computed from Finalize(). - - Raises: - AssertionError: On finding a mismatch. - """ - actual = self.GetPropertyFilesString(input_zip) - assert actual == expected, \ - "Mismatching streaming metadata: {} vs {}.".format(actual, expected) - - def GetPropertyFilesString(self, zip_file, reserve_space=False): - """ - Constructs the property-files string per request. - - Args: - zip_file: The input ZIP file. - reserved_length: The reserved length of the property-files string. - - Returns: - A property-files string including the metadata offset/size info, e.g. - "payload.bin:679:343,payload_properties.txt:378:45,metadata: ". - """ - - def ComputeEntryOffsetSize(name): - """Computes the zip entry offset and size.""" - info = zip_file.getinfo(name) - offset = info.header_offset - offset += zipfile.sizeFileHeader - offset += len(info.extra) + len(info.filename) - size = info.file_size - return '%s:%d:%d' % (os.path.basename(name), offset, size) - - tokens = [] - tokens.extend(self._GetPrecomputed(zip_file)) - for entry in self.required: - tokens.append(ComputeEntryOffsetSize(entry)) - for entry in self.optional: - if entry in zip_file.namelist(): - tokens.append(ComputeEntryOffsetSize(entry)) - - # 'META-INF/com/android/metadata' is required. We don't know its actual - # offset and length (as well as the values for other entries). So we reserve - # 15-byte as a placeholder ('offset:length'), which is sufficient to cover - # the space for metadata entry. Because 'offset' allows a max of 10-digit - # (i.e. ~9 GiB), with a max of 4-digit for the length. Note that all the - # reserved space serves the metadata entry only. - if reserve_space: - tokens.append('metadata:' + ' ' * 15) - else: - tokens.append(ComputeEntryOffsetSize(METADATA_NAME)) - - return ','.join(tokens) - - def _GetPrecomputed(self, input_zip): - """Computes the additional tokens to be included into the property-files. - - This applies to tokens without actual ZIP entries, such as - payload_metadadata.bin. We want to expose the offset/size to updaters, so - that they can download the payload metadata directly with the info. - - Args: - input_zip: The input zip file. - - Returns: - A list of strings (tokens) to be added to the property-files string. - """ - # pylint: disable=no-self-use - # pylint: disable=unused-argument - return [] - - class StreamingPropertyFiles(PropertyFiles): """A subclass for computing the property-files for streaming A/B OTAs.""" @@ -1264,362 +601,6 @@ class AbOtaPropertyFiles(StreamingPropertyFiles): return (payload_offset, metadata_total) -class NonAbOtaPropertyFiles(PropertyFiles): - """The property-files for non-A/B OTA. - - For non-A/B OTA, the property-files string contains the info for METADATA - entry, with which a system updater can be fetched the package metadata prior - to downloading the entire package. - """ - - def __init__(self): - super(NonAbOtaPropertyFiles, self).__init__() - self.name = 'ota-property-files' - - -def FinalizeMetadata(metadata, input_file, output_file, needed_property_files): - """Finalizes the metadata and signs an A/B OTA package. - - In order to stream an A/B OTA package, we need 'ota-streaming-property-files' - that contains the offsets and sizes for the ZIP entries. An example - property-files string is as follows. - - "payload.bin:679:343,payload_properties.txt:378:45,metadata:69:379" - - OTA server can pass down this string, in addition to the package URL, to the - system update client. System update client can then fetch individual ZIP - entries (ZIP_STORED) directly at the given offset of the URL. - - Args: - metadata: The metadata dict for the package. - input_file: The input ZIP filename that doesn't contain the package METADATA - entry yet. - output_file: The final output ZIP filename. - needed_property_files: The list of PropertyFiles' to be generated. - """ - - def ComputeAllPropertyFiles(input_file, needed_property_files): - # Write the current metadata entry with placeholders. - with zipfile.ZipFile(input_file) as input_zip: - for property_files in needed_property_files: - metadata[property_files.name] = property_files.Compute(input_zip) - namelist = input_zip.namelist() - - if METADATA_NAME in namelist: - common.ZipDelete(input_file, METADATA_NAME) - output_zip = zipfile.ZipFile(input_file, 'a') - WriteMetadata(metadata, output_zip) - common.ZipClose(output_zip) - - if OPTIONS.no_signing: - return input_file - - prelim_signing = common.MakeTempFile(suffix='.zip') - SignOutput(input_file, prelim_signing) - return prelim_signing - - def FinalizeAllPropertyFiles(prelim_signing, needed_property_files): - with zipfile.ZipFile(prelim_signing) as prelim_signing_zip: - for property_files in needed_property_files: - metadata[property_files.name] = property_files.Finalize( - prelim_signing_zip, len(metadata[property_files.name])) - - # SignOutput(), which in turn calls signapk.jar, will possibly reorder the ZIP - # entries, as well as padding the entry headers. We do a preliminary signing - # (with an incomplete metadata entry) to allow that to happen. Then compute - # the ZIP entry offsets, write back the final metadata and do the final - # signing. - prelim_signing = ComputeAllPropertyFiles(input_file, needed_property_files) - try: - FinalizeAllPropertyFiles(prelim_signing, needed_property_files) - except PropertyFiles.InsufficientSpaceException: - # Even with the preliminary signing, the entry orders may change - # dramatically, which leads to insufficiently reserved space during the - # first call to ComputeAllPropertyFiles(). In that case, we redo all the - # preliminary signing works, based on the already ordered ZIP entries, to - # address the issue. - prelim_signing = ComputeAllPropertyFiles( - prelim_signing, needed_property_files) - FinalizeAllPropertyFiles(prelim_signing, needed_property_files) - - # Replace the METADATA entry. - common.ZipDelete(prelim_signing, METADATA_NAME) - output_zip = zipfile.ZipFile(prelim_signing, 'a') - WriteMetadata(metadata, output_zip) - common.ZipClose(output_zip) - - # Re-sign the package after updating the metadata entry. - if OPTIONS.no_signing: - output_file = prelim_signing - else: - SignOutput(prelim_signing, output_file) - - # Reopen the final signed zip to double check the streaming metadata. - with zipfile.ZipFile(output_file) as output_zip: - for property_files in needed_property_files: - property_files.Verify(output_zip, metadata[property_files.name].strip()) - - # If requested, dump the metadata to a separate file. - output_metadata_path = OPTIONS.output_metadata_path - if output_metadata_path: - WriteMetadata(metadata, output_metadata_path) - - -def WriteBlockIncrementalOTAPackage(target_zip, source_zip, output_file): - target_info = common.BuildInfo(OPTIONS.target_info_dict, OPTIONS.oem_dicts) - source_info = common.BuildInfo(OPTIONS.source_info_dict, OPTIONS.oem_dicts) - - target_api_version = target_info["recovery_api_version"] - source_api_version = source_info["recovery_api_version"] - if source_api_version == 0: - logger.warning( - "Generating edify script for a source that can't install it.") - - script = edify_generator.EdifyGenerator( - source_api_version, target_info, fstab=source_info["fstab"]) - - if target_info.oem_props or source_info.oem_props: - if not OPTIONS.oem_no_mount: - source_info.WriteMountOemScript(script) - - metadata = GetPackageMetadata(target_info, source_info) - - if not OPTIONS.no_signing: - staging_file = common.MakeTempFile(suffix='.zip') - else: - staging_file = output_file - - output_zip = zipfile.ZipFile( - staging_file, "w", compression=zipfile.ZIP_DEFLATED) - - device_specific = common.DeviceSpecificParams( - source_zip=source_zip, - source_version=source_api_version, - source_tmp=OPTIONS.source_tmp, - target_zip=target_zip, - target_version=target_api_version, - target_tmp=OPTIONS.target_tmp, - output_zip=output_zip, - script=script, - metadata=metadata, - info_dict=source_info) - - source_boot = common.GetBootableImage( - "/tmp/boot.img", "boot.img", OPTIONS.source_tmp, "BOOT", source_info) - target_boot = common.GetBootableImage( - "/tmp/boot.img", "boot.img", OPTIONS.target_tmp, "BOOT", target_info) - updating_boot = (not OPTIONS.two_step and - (source_boot.data != target_boot.data)) - - target_recovery = common.GetBootableImage( - "/tmp/recovery.img", "recovery.img", OPTIONS.target_tmp, "RECOVERY") - - block_diff_dict = GetBlockDifferences(target_zip=target_zip, - source_zip=source_zip, - target_info=target_info, - source_info=source_info, - device_specific=device_specific) - - CheckVintfIfTrebleEnabled(OPTIONS.target_tmp, target_info) - - # Assertions (e.g. device properties check). - target_info.WriteDeviceAssertions(script, OPTIONS.oem_no_mount) - device_specific.IncrementalOTA_Assertions() - - # Two-step incremental package strategy (in chronological order, - # which is *not* the order in which the generated script has - # things): - # - # if stage is not "2/3" or "3/3": - # do verification on current system - # write recovery image to boot partition - # set stage to "2/3" - # reboot to boot partition and restart recovery - # else if stage is "2/3": - # write recovery image to recovery partition - # set stage to "3/3" - # reboot to recovery partition and restart recovery - # else: - # (stage must be "3/3") - # perform update: - # patch system files, etc. - # force full install of new boot image - # set up system to update recovery partition on first boot - # complete script normally - # (allow recovery to mark itself finished and reboot) - - if OPTIONS.two_step: - if not source_info.get("multistage_support"): - assert False, "two-step packages not supported by this build" - fs = source_info["fstab"]["/misc"] - assert fs.fs_type.upper() == "EMMC", \ - "two-step packages only supported on devices with EMMC /misc partitions" - bcb_dev = {"bcb_dev": fs.device} - common.ZipWriteStr(output_zip, "recovery.img", target_recovery.data) - script.AppendExtra(""" -if get_stage("%(bcb_dev)s") == "2/3" then -""" % bcb_dev) - - # Stage 2/3: Write recovery image to /recovery (currently running /boot). - script.Comment("Stage 2/3") - script.AppendExtra("sleep(20);\n") - script.WriteRawImage("/recovery", "recovery.img") - script.AppendExtra(""" -set_stage("%(bcb_dev)s", "3/3"); -reboot_now("%(bcb_dev)s", "recovery"); -else if get_stage("%(bcb_dev)s") != "3/3" then -""" % bcb_dev) - - # Stage 1/3: (a) Verify the current system. - script.Comment("Stage 1/3") - - # Dump fingerprints - script.Print("Source: {}".format(source_info.fingerprint)) - script.Print("Target: {}".format(target_info.fingerprint)) - - script.Print("Verifying current system...") - - device_specific.IncrementalOTA_VerifyBegin() - - WriteFingerprintAssertion(script, target_info, source_info) - - # Check the required cache size (i.e. stashed blocks). - required_cache_sizes = [diff.required_cache for diff in - block_diff_dict.values()] - if updating_boot: - boot_type, boot_device_expr = common.GetTypeAndDeviceExpr("/boot", - source_info) - d = common.Difference(target_boot, source_boot) - _, _, d = d.ComputePatch() - if d is None: - include_full_boot = True - common.ZipWriteStr(output_zip, "boot.img", target_boot.data) - else: - include_full_boot = False - - logger.info( - "boot target: %d source: %d diff: %d", target_boot.size, - source_boot.size, len(d)) - - common.ZipWriteStr(output_zip, "boot.img.p", d) - - target_expr = 'concat("{}:",{},":{}:{}")'.format( - boot_type, boot_device_expr, target_boot.size, target_boot.sha1) - source_expr = 'concat("{}:",{},":{}:{}")'.format( - boot_type, boot_device_expr, source_boot.size, source_boot.sha1) - script.PatchPartitionExprCheck(target_expr, source_expr) - - required_cache_sizes.append(target_boot.size) - - if required_cache_sizes: - script.CacheFreeSpaceCheck(max(required_cache_sizes)) - - # Verify the existing partitions. - for diff in block_diff_dict.values(): - diff.WriteVerifyScript(script, touched_blocks_only=True) - - device_specific.IncrementalOTA_VerifyEnd() - - if OPTIONS.two_step: - # Stage 1/3: (b) Write recovery image to /boot. - _WriteRecoveryImageToBoot(script, output_zip) - - script.AppendExtra(""" -set_stage("%(bcb_dev)s", "2/3"); -reboot_now("%(bcb_dev)s", ""); -else -""" % bcb_dev) - - # Stage 3/3: Make changes. - script.Comment("Stage 3/3") - - script.Comment("---- start making changes here ----") - - device_specific.IncrementalOTA_InstallBegin() - - progress_dict = {partition: 0.1 for partition in block_diff_dict} - progress_dict["system"] = 1 - len(block_diff_dict) * 0.1 - - if OPTIONS.source_info_dict.get("use_dynamic_partitions") == "true": - if OPTIONS.target_info_dict.get("use_dynamic_partitions") != "true": - raise RuntimeError( - "can't generate incremental that disables dynamic partitions") - dynamic_partitions_diff = common.DynamicPartitionsDifference( - info_dict=OPTIONS.target_info_dict, - source_info_dict=OPTIONS.source_info_dict, - block_diffs=block_diff_dict.values(), - progress_dict=progress_dict) - dynamic_partitions_diff.WriteScript( - script, output_zip, write_verify_script=OPTIONS.verify) - else: - for block_diff in block_diff_dict.values(): - block_diff.WriteScript(script, output_zip, - progress=progress_dict.get(block_diff.partition), - write_verify_script=OPTIONS.verify) - - if OPTIONS.two_step: - common.ZipWriteStr(output_zip, "boot.img", target_boot.data) - script.WriteRawImage("/boot", "boot.img") - logger.info("writing full boot image (forced by two-step mode)") - - if not OPTIONS.two_step: - if updating_boot: - if include_full_boot: - logger.info("boot image changed; including full.") - script.Print("Installing boot image...") - script.WriteRawImage("/boot", "boot.img") - else: - # Produce the boot image by applying a patch to the current - # contents of the boot partition, and write it back to the - # partition. - logger.info("boot image changed; including patch.") - script.Print("Patching boot image...") - script.ShowProgress(0.1, 10) - target_expr = 'concat("{}:",{},":{}:{}")'.format( - boot_type, boot_device_expr, target_boot.size, target_boot.sha1) - source_expr = 'concat("{}:",{},":{}:{}")'.format( - boot_type, boot_device_expr, source_boot.size, source_boot.sha1) - script.PatchPartitionExpr(target_expr, source_expr, '"boot.img.p"') - else: - logger.info("boot image unchanged; skipping.") - - # Do device-specific installation (eg, write radio image). - device_specific.IncrementalOTA_InstallEnd() - - if OPTIONS.extra_script is not None: - script.AppendExtra(OPTIONS.extra_script) - - if OPTIONS.wipe_user_data: - script.Print("Erasing user data...") - script.FormatPartition("/data") - - if OPTIONS.two_step: - script.AppendExtra(""" -set_stage("%(bcb_dev)s", ""); -endif; -endif; -""" % bcb_dev) - - script.SetProgress(1) - # For downgrade OTAs, we prefer to use the update-binary in the source - # build that is actually newer than the one in the target build. - if OPTIONS.downgrade: - script.AddToZip(source_zip, output_zip, input_path=OPTIONS.updater_binary) - else: - script.AddToZip(target_zip, output_zip, input_path=OPTIONS.updater_binary) - metadata["ota-required-cache"] = str(script.required_cache) - - # We haven't written the metadata entry yet, which will be handled in - # FinalizeMetadata(). - common.ZipClose(output_zip) - - # Sign the generated zip package unless no_signing is specified. - needed_property_files = ( - NonAbOtaPropertyFiles(), - ) - FinalizeMetadata(metadata, staging_file, output_file, needed_property_files) - - def GetTargetFilesZipForSecondaryImages(input_file, skip_postinstall=False): """Returns a target-files.zip file for generating secondary payload. @@ -1938,104 +919,6 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None): FinalizeMetadata(metadata, staging_file, output_file, needed_property_files) -def GenerateNonAbOtaPackage(target_file, output_file, source_file=None): - """Generates a non-A/B OTA package.""" - # Check the loaded info dicts first. - if OPTIONS.info_dict.get("no_recovery") == "true": - raise common.ExternalError( - "--- target build has specified no recovery ---") - - # Non-A/B OTAs rely on /cache partition to store temporary files. - cache_size = OPTIONS.info_dict.get("cache_size") - if cache_size is None: - logger.warning("--- can't determine the cache partition size ---") - OPTIONS.cache_size = cache_size - - if OPTIONS.extra_script is not None: - with open(OPTIONS.extra_script) as fp: - OPTIONS.extra_script = fp.read() - - if OPTIONS.extracted_input is not None: - OPTIONS.input_tmp = OPTIONS.extracted_input - else: - logger.info("unzipping target target-files...") - OPTIONS.input_tmp = common.UnzipTemp(target_file, UNZIP_PATTERN) - OPTIONS.target_tmp = OPTIONS.input_tmp - - # If the caller explicitly specified the device-specific extensions path via - # -s / --device_specific, use that. Otherwise, use META/releasetools.py if it - # is present in the target target_files. Otherwise, take the path of the file - # from 'tool_extensions' in the info dict and look for that in the local - # filesystem, relative to the current directory. - if OPTIONS.device_specific is None: - from_input = os.path.join(OPTIONS.input_tmp, "META", "releasetools.py") - if os.path.exists(from_input): - logger.info("(using device-specific extensions from target_files)") - OPTIONS.device_specific = from_input - else: - OPTIONS.device_specific = OPTIONS.info_dict.get("tool_extensions") - - if OPTIONS.device_specific is not None: - OPTIONS.device_specific = os.path.abspath(OPTIONS.device_specific) - - # Generate a full OTA. - if source_file is None: - with zipfile.ZipFile(target_file) as input_zip: - WriteFullOTAPackage( - input_zip, - output_file) - - # Generate an incremental OTA. - else: - logger.info("unzipping source target-files...") - OPTIONS.source_tmp = common.UnzipTemp( - OPTIONS.incremental_source, UNZIP_PATTERN) - with zipfile.ZipFile(target_file) as input_zip, \ - zipfile.ZipFile(source_file) as source_zip: - WriteBlockIncrementalOTAPackage( - input_zip, - source_zip, - output_file) - - -def CalculateRuntimeDevicesAndFingerprints(build_info, boot_variable_values): - """Returns a tuple of sets for runtime devices and fingerprints""" - - device_names = {build_info.device} - fingerprints = {build_info.fingerprint} - - if not boot_variable_values: - return device_names, fingerprints - - # Calculate all possible combinations of the values for the boot variables. - keys = boot_variable_values.keys() - value_list = boot_variable_values.values() - combinations = [dict(zip(keys, values)) - for values in itertools.product(*value_list)] - for placeholder_values in combinations: - # Reload the info_dict as some build properties may change their values - # based on the value of ro.boot* properties. - info_dict = copy.deepcopy(build_info.info_dict) - for partition in common.PARTITIONS_WITH_CARE_MAP: - partition_prop_key = "{}.build.prop".format(partition) - input_file = info_dict[partition_prop_key].input_file - if isinstance(input_file, zipfile.ZipFile): - with zipfile.ZipFile(input_file.filename) as input_zip: - info_dict[partition_prop_key] = \ - common.PartitionBuildProps.FromInputFile(input_zip, partition, - placeholder_values) - else: - info_dict[partition_prop_key] = \ - common.PartitionBuildProps.FromInputFile(input_file, partition, - placeholder_values) - info_dict["build.prop"] = info_dict["system.build.prop"] - - new_build_info = common.BuildInfo(info_dict, build_info.oem_dicts) - device_names.add(new_build_info.device) - fingerprints.add(new_build_info.fingerprint) - return device_names, fingerprints - - def main(argv): def option_handler(o, a): diff --git a/tools/releasetools/ota_utils.py b/tools/releasetools/ota_utils.py new file mode 100644 index 0000000000..874ab951c1 --- /dev/null +++ b/tools/releasetools/ota_utils.py @@ -0,0 +1,433 @@ +# Copyright (C) 2020 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import itertools +import os +import zipfile + +from common import (ZipDelete, ZipClose, OPTIONS, MakeTempFile, + ZipWriteStr, BuildInfo, LoadDictionaryFromFile, + SignFile, PARTITIONS_WITH_CARE_MAP, PartitionBuildProps) + +METADATA_NAME = 'META-INF/com/android/metadata' +UNZIP_PATTERN = ['IMAGES/*', 'META/*', 'OTA/*', 'RADIO/*'] + + +def FinalizeMetadata(metadata, input_file, output_file, needed_property_files): + """Finalizes the metadata and signs an A/B OTA package. + + In order to stream an A/B OTA package, we need 'ota-streaming-property-files' + that contains the offsets and sizes for the ZIP entries. An example + property-files string is as follows. + + "payload.bin:679:343,payload_properties.txt:378:45,metadata:69:379" + + OTA server can pass down this string, in addition to the package URL, to the + system update client. System update client can then fetch individual ZIP + entries (ZIP_STORED) directly at the given offset of the URL. + + Args: + metadata: The metadata dict for the package. + input_file: The input ZIP filename that doesn't contain the package METADATA + entry yet. + output_file: The final output ZIP filename. + needed_property_files: The list of PropertyFiles' to be generated. + """ + + def ComputeAllPropertyFiles(input_file, needed_property_files): + # Write the current metadata entry with placeholders. + with zipfile.ZipFile(input_file) as input_zip: + for property_files in needed_property_files: + metadata[property_files.name] = property_files.Compute(input_zip) + namelist = input_zip.namelist() + + if METADATA_NAME in namelist: + ZipDelete(input_file, METADATA_NAME) + output_zip = zipfile.ZipFile(input_file, 'a') + WriteMetadata(metadata, output_zip) + ZipClose(output_zip) + + if OPTIONS.no_signing: + return input_file + + prelim_signing = MakeTempFile(suffix='.zip') + SignOutput(input_file, prelim_signing) + return prelim_signing + + def FinalizeAllPropertyFiles(prelim_signing, needed_property_files): + with zipfile.ZipFile(prelim_signing) as prelim_signing_zip: + for property_files in needed_property_files: + metadata[property_files.name] = property_files.Finalize( + prelim_signing_zip, len(metadata[property_files.name])) + + # SignOutput(), which in turn calls signapk.jar, will possibly reorder the ZIP + # entries, as well as padding the entry headers. We do a preliminary signing + # (with an incomplete metadata entry) to allow that to happen. Then compute + # the ZIP entry offsets, write back the final metadata and do the final + # signing. + prelim_signing = ComputeAllPropertyFiles(input_file, needed_property_files) + try: + FinalizeAllPropertyFiles(prelim_signing, needed_property_files) + except PropertyFiles.InsufficientSpaceException: + # Even with the preliminary signing, the entry orders may change + # dramatically, which leads to insufficiently reserved space during the + # first call to ComputeAllPropertyFiles(). In that case, we redo all the + # preliminary signing works, based on the already ordered ZIP entries, to + # address the issue. + prelim_signing = ComputeAllPropertyFiles( + prelim_signing, needed_property_files) + FinalizeAllPropertyFiles(prelim_signing, needed_property_files) + + # Replace the METADATA entry. + ZipDelete(prelim_signing, METADATA_NAME) + output_zip = zipfile.ZipFile(prelim_signing, 'a') + WriteMetadata(metadata, output_zip) + ZipClose(output_zip) + + # Re-sign the package after updating the metadata entry. + if OPTIONS.no_signing: + output_file = prelim_signing + else: + SignOutput(prelim_signing, output_file) + + # Reopen the final signed zip to double check the streaming metadata. + with zipfile.ZipFile(output_file) as output_zip: + for property_files in needed_property_files: + property_files.Verify(output_zip, metadata[property_files.name].strip()) + + # If requested, dump the metadata to a separate file. + output_metadata_path = OPTIONS.output_metadata_path + if output_metadata_path: + WriteMetadata(metadata, output_metadata_path) + + +def WriteMetadata(metadata, output): + """Writes the metadata to the zip archive or a file. + + Args: + metadata: The metadata dict for the package. + output: A ZipFile object or a string of the output file path. + """ + + value = "".join(["%s=%s\n" % kv for kv in sorted(metadata.items())]) + if isinstance(output, zipfile.ZipFile): + ZipWriteStr(output, METADATA_NAME, value, + compress_type=zipfile.ZIP_STORED) + return + + with open(output, 'w') as f: + f.write(value) + + +def GetPackageMetadata(target_info, source_info=None): + """Generates and returns the metadata dict. + + It generates a dict() that contains the info to be written into an OTA + package (META-INF/com/android/metadata). It also handles the detection of + downgrade / data wipe based on the global options. + + Args: + target_info: The BuildInfo instance that holds the target build info. + source_info: The BuildInfo instance that holds the source build info, or + None if generating full OTA. + + Returns: + A dict to be written into package metadata entry. + """ + assert isinstance(target_info, BuildInfo) + assert source_info is None or isinstance(source_info, BuildInfo) + + separator = '|' + + boot_variable_values = {} + if OPTIONS.boot_variable_file: + d = LoadDictionaryFromFile(OPTIONS.boot_variable_file) + for key, values in d.items(): + boot_variable_values[key] = [val.strip() for val in values.split(',')] + + post_build_devices, post_build_fingerprints = \ + CalculateRuntimeDevicesAndFingerprints(target_info, boot_variable_values) + metadata = { + 'post-build': separator.join(sorted(post_build_fingerprints)), + 'post-build-incremental': target_info.GetBuildProp( + 'ro.build.version.incremental'), + 'post-sdk-level': target_info.GetBuildProp( + 'ro.build.version.sdk'), + 'post-security-patch-level': target_info.GetBuildProp( + 'ro.build.version.security_patch'), + } + + if target_info.is_ab and not OPTIONS.force_non_ab: + metadata['ota-type'] = 'AB' + metadata['ota-required-cache'] = '0' + else: + metadata['ota-type'] = 'BLOCK' + + if OPTIONS.wipe_user_data: + metadata['ota-wipe'] = 'yes' + + if OPTIONS.retrofit_dynamic_partitions: + metadata['ota-retrofit-dynamic-partitions'] = 'yes' + + is_incremental = source_info is not None + if is_incremental: + pre_build_devices, pre_build_fingerprints = \ + CalculateRuntimeDevicesAndFingerprints(source_info, + boot_variable_values) + metadata['pre-build'] = separator.join(sorted(pre_build_fingerprints)) + metadata['pre-build-incremental'] = source_info.GetBuildProp( + 'ro.build.version.incremental') + metadata['pre-device'] = separator.join(sorted(pre_build_devices)) + else: + metadata['pre-device'] = separator.join(sorted(post_build_devices)) + + # Use the actual post-timestamp, even for a downgrade case. + metadata['post-timestamp'] = target_info.GetBuildProp('ro.build.date.utc') + + # Detect downgrades and set up downgrade flags accordingly. + if is_incremental: + HandleDowngradeMetadata(metadata, target_info, source_info) + + return metadata + + +def HandleDowngradeMetadata(metadata, target_info, source_info): + # Only incremental OTAs are allowed to reach here. + assert OPTIONS.incremental_source is not None + + post_timestamp = target_info.GetBuildProp("ro.build.date.utc") + pre_timestamp = source_info.GetBuildProp("ro.build.date.utc") + is_downgrade = int(post_timestamp) < int(pre_timestamp) + + if OPTIONS.downgrade: + if not is_downgrade: + raise RuntimeError( + "--downgrade or --override_timestamp specified but no downgrade " + "detected: pre: %s, post: %s" % (pre_timestamp, post_timestamp)) + metadata["ota-downgrade"] = "yes" + else: + if is_downgrade: + raise RuntimeError( + "Downgrade detected based on timestamp check: pre: %s, post: %s. " + "Need to specify --override_timestamp OR --downgrade to allow " + "building the incremental." % (pre_timestamp, post_timestamp)) + + +def CalculateRuntimeDevicesAndFingerprints(build_info, boot_variable_values): + """Returns a tuple of sets for runtime devices and fingerprints""" + + device_names = {build_info.device} + fingerprints = {build_info.fingerprint} + + if not boot_variable_values: + return device_names, fingerprints + + # Calculate all possible combinations of the values for the boot variables. + keys = boot_variable_values.keys() + value_list = boot_variable_values.values() + combinations = [dict(zip(keys, values)) + for values in itertools.product(*value_list)] + for placeholder_values in combinations: + # Reload the info_dict as some build properties may change their values + # based on the value of ro.boot* properties. + info_dict = copy.deepcopy(build_info.info_dict) + for partition in PARTITIONS_WITH_CARE_MAP: + partition_prop_key = "{}.build.prop".format(partition) + input_file = info_dict[partition_prop_key].input_file + if isinstance(input_file, zipfile.ZipFile): + with zipfile.ZipFile(input_file.filename) as input_zip: + info_dict[partition_prop_key] = \ + PartitionBuildProps.FromInputFile(input_zip, partition, + placeholder_values) + else: + info_dict[partition_prop_key] = \ + PartitionBuildProps.FromInputFile(input_file, partition, + placeholder_values) + info_dict["build.prop"] = info_dict["system.build.prop"] + + new_build_info = BuildInfo(info_dict, build_info.oem_dicts) + device_names.add(new_build_info.device) + fingerprints.add(new_build_info.fingerprint) + return device_names, fingerprints + + +class PropertyFiles(object): + """A class that computes the property-files string for an OTA package. + + A property-files string is a comma-separated string that contains the + offset/size info for an OTA package. The entries, which must be ZIP_STORED, + can be fetched directly with the package URL along with the offset/size info. + These strings can be used for streaming A/B OTAs, or allowing an updater to + download package metadata entry directly, without paying the cost of + downloading entire package. + + Computing the final property-files string requires two passes. Because doing + the whole package signing (with signapk.jar) will possibly reorder the ZIP + entries, which may in turn invalidate earlier computed ZIP entry offset/size + values. + + This class provides functions to be called for each pass. The general flow is + as follows. + + property_files = PropertyFiles() + # The first pass, which writes placeholders before doing initial signing. + property_files.Compute() + SignOutput() + + # The second pass, by replacing the placeholders with actual data. + property_files.Finalize() + SignOutput() + + And the caller can additionally verify the final result. + + property_files.Verify() + """ + + def __init__(self): + self.name = None + self.required = () + self.optional = () + + def Compute(self, input_zip): + """Computes and returns a property-files string with placeholders. + + We reserve extra space for the offset and size of the metadata entry itself, + although we don't know the final values until the package gets signed. + + Args: + input_zip: The input ZIP file. + + Returns: + A string with placeholders for the metadata offset/size info, e.g. + "payload.bin:679:343,payload_properties.txt:378:45,metadata: ". + """ + return self.GetPropertyFilesString(input_zip, reserve_space=True) + + class InsufficientSpaceException(Exception): + pass + + def Finalize(self, input_zip, reserved_length): + """Finalizes a property-files string with actual METADATA offset/size info. + + The input ZIP file has been signed, with the ZIP entries in the desired + place (signapk.jar will possibly reorder the ZIP entries). Now we compute + the ZIP entry offsets and construct the property-files string with actual + data. Note that during this process, we must pad the property-files string + to the reserved length, so that the METADATA entry size remains the same. + Otherwise the entries' offsets and sizes may change again. + + Args: + input_zip: The input ZIP file. + reserved_length: The reserved length of the property-files string during + the call to Compute(). The final string must be no more than this + size. + + Returns: + A property-files string including the metadata offset/size info, e.g. + "payload.bin:679:343,payload_properties.txt:378:45,metadata:69:379 ". + + Raises: + InsufficientSpaceException: If the reserved length is insufficient to hold + the final string. + """ + result = self.GetPropertyFilesString(input_zip, reserve_space=False) + if len(result) > reserved_length: + raise self.InsufficientSpaceException( + 'Insufficient reserved space: reserved={}, actual={}'.format( + reserved_length, len(result))) + + result += ' ' * (reserved_length - len(result)) + return result + + def Verify(self, input_zip, expected): + """Verifies the input ZIP file contains the expected property-files string. + + Args: + input_zip: The input ZIP file. + expected: The property-files string that's computed from Finalize(). + + Raises: + AssertionError: On finding a mismatch. + """ + actual = self.GetPropertyFilesString(input_zip) + assert actual == expected, \ + "Mismatching streaming metadata: {} vs {}.".format(actual, expected) + + def GetPropertyFilesString(self, zip_file, reserve_space=False): + """ + Constructs the property-files string per request. + + Args: + zip_file: The input ZIP file. + reserved_length: The reserved length of the property-files string. + + Returns: + A property-files string including the metadata offset/size info, e.g. + "payload.bin:679:343,payload_properties.txt:378:45,metadata: ". + """ + + def ComputeEntryOffsetSize(name): + """Computes the zip entry offset and size.""" + info = zip_file.getinfo(name) + offset = info.header_offset + offset += zipfile.sizeFileHeader + offset += len(info.extra) + len(info.filename) + size = info.file_size + return '%s:%d:%d' % (os.path.basename(name), offset, size) + + tokens = [] + tokens.extend(self._GetPrecomputed(zip_file)) + for entry in self.required: + tokens.append(ComputeEntryOffsetSize(entry)) + for entry in self.optional: + if entry in zip_file.namelist(): + tokens.append(ComputeEntryOffsetSize(entry)) + + # 'META-INF/com/android/metadata' is required. We don't know its actual + # offset and length (as well as the values for other entries). So we reserve + # 15-byte as a placeholder ('offset:length'), which is sufficient to cover + # the space for metadata entry. Because 'offset' allows a max of 10-digit + # (i.e. ~9 GiB), with a max of 4-digit for the length. Note that all the + # reserved space serves the metadata entry only. + if reserve_space: + tokens.append('metadata:' + ' ' * 15) + else: + tokens.append(ComputeEntryOffsetSize(METADATA_NAME)) + + return ','.join(tokens) + + def _GetPrecomputed(self, input_zip): + """Computes the additional tokens to be included into the property-files. + + This applies to tokens without actual ZIP entries, such as + payload_metadata.bin. We want to expose the offset/size to updaters, so + that they can download the payload metadata directly with the info. + + Args: + input_zip: The input zip file. + + Returns: + A list of strings (tokens) to be added to the property-files string. + """ + # pylint: disable=no-self-use + # pylint: disable=unused-argument + return [] + + +def SignOutput(temp_zip_name, output_zip_name): + pw = OPTIONS.key_passwords[OPTIONS.package_key] + + SignFile(temp_zip_name, output_zip_name, OPTIONS.package_key, pw, + whole_file=True) diff --git a/tools/releasetools/test_non_ab_ota.py b/tools/releasetools/test_non_ab_ota.py new file mode 100644 index 0000000000..ee1b4113c9 --- /dev/null +++ b/tools/releasetools/test_non_ab_ota.py @@ -0,0 +1,169 @@ +# +# Copyright (C) 2020 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import copy +import zipfile + +import common +import test_utils + +from non_ab_ota import NonAbOtaPropertyFiles, WriteFingerprintAssertion +from test_utils import PropertyFilesTestCase + + +class NonAbOtaPropertyFilesTest(PropertyFilesTestCase): + """Additional validity checks specialized for NonAbOtaPropertyFiles.""" + def setUp(self): + common.OPTIONS.no_signing = False + def test_init(self): + property_files = NonAbOtaPropertyFiles() + self.assertEqual('ota-property-files', property_files.name) + self.assertEqual((), property_files.required) + self.assertEqual((), property_files.optional) + + def test_Compute(self): + entries = () + zip_file = self.construct_zip_package(entries) + property_files = NonAbOtaPropertyFiles() + with zipfile.ZipFile(zip_file) as zip_fp: + property_files_string = property_files.Compute(zip_fp) + + tokens = self._parse_property_files_string(property_files_string) + self.assertEqual(1, len(tokens)) + self._verify_entries(zip_file, tokens, entries) + + def test_Finalize(self): + entries = [ + 'META-INF/com/android/metadata', + ] + zip_file = self.construct_zip_package(entries) + property_files = NonAbOtaPropertyFiles() + with zipfile.ZipFile(zip_file) as zip_fp: + raw_metadata = property_files.GetPropertyFilesString( + zip_fp, reserve_space=False) + property_files_string = property_files.Finalize(zip_fp, len(raw_metadata)) + tokens = self._parse_property_files_string(property_files_string) + + self.assertEqual(1, len(tokens)) + # 'META-INF/com/android/metadata' will be key'd as 'metadata'. + entries[0] = 'metadata' + self._verify_entries(zip_file, tokens, entries) + + def test_Verify(self): + entries = ( + 'META-INF/com/android/metadata', + ) + zip_file = self.construct_zip_package(entries) + property_files = NonAbOtaPropertyFiles() + with zipfile.ZipFile(zip_file) as zip_fp: + raw_metadata = property_files.GetPropertyFilesString( + zip_fp, reserve_space=False) + + property_files.Verify(zip_fp, raw_metadata) + +class NonAbOTATest(test_utils.ReleaseToolsTestCase): + TEST_TARGET_INFO_DICT = { + 'build.prop': common.PartitionBuildProps.FromDictionary( + 'system', { + 'ro.product.device': 'product-device', + 'ro.build.fingerprint': 'build-fingerprint-target', + 'ro.build.version.incremental': 'build-version-incremental-target', + 'ro.build.version.sdk': '27', + 'ro.build.version.security_patch': '2017-12-01', + 'ro.build.date.utc': '1500000000'} + ) + } + TEST_INFO_DICT_USES_OEM_PROPS = { + 'build.prop': common.PartitionBuildProps.FromDictionary( + 'system', { + 'ro.product.name': 'product-name', + 'ro.build.thumbprint': 'build-thumbprint', + 'ro.build.bar': 'build-bar'} + ), + 'vendor.build.prop': common.PartitionBuildProps.FromDictionary( + 'vendor', { + 'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'} + ), + 'property1': 'value1', + 'property2': 4096, + 'oem_fingerprint_properties': 'ro.product.device ro.product.brand', + } + TEST_OEM_DICTS = [ + { + 'ro.product.brand': 'brand1', + 'ro.product.device': 'device1', + }, + { + 'ro.product.brand': 'brand2', + 'ro.product.device': 'device2', + }, + { + 'ro.product.brand': 'brand3', + 'ro.product.device': 'device3', + }, + ] + def test_WriteFingerprintAssertion_without_oem_props(self): + target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None) + source_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT) + source_info_dict['build.prop'].build_props['ro.build.fingerprint'] = ( + 'source-build-fingerprint') + source_info = common.BuildInfo(source_info_dict, None) + + script_writer = test_utils.MockScriptWriter() + WriteFingerprintAssertion(script_writer, target_info, source_info) + self.assertEqual( + [('AssertSomeFingerprint', 'source-build-fingerprint', + 'build-fingerprint-target')], + script_writer.lines) + + def test_WriteFingerprintAssertion_with_source_oem_props(self): + target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None) + source_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, + self.TEST_OEM_DICTS) + + script_writer = test_utils.MockScriptWriter() + WriteFingerprintAssertion(script_writer, target_info, source_info) + self.assertEqual( + [('AssertFingerprintOrThumbprint', 'build-fingerprint-target', + 'build-thumbprint')], + script_writer.lines) + + def test_WriteFingerprintAssertion_with_target_oem_props(self): + target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, + self.TEST_OEM_DICTS) + source_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None) + + script_writer = test_utils.MockScriptWriter() + WriteFingerprintAssertion(script_writer, target_info, source_info) + self.assertEqual( + [('AssertFingerprintOrThumbprint', 'build-fingerprint-target', + 'build-thumbprint')], + script_writer.lines) + + def test_WriteFingerprintAssertion_with_both_oem_props(self): + target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, + self.TEST_OEM_DICTS) + source_info_dict = copy.deepcopy(self.TEST_INFO_DICT_USES_OEM_PROPS) + source_info_dict['build.prop'].build_props['ro.build.thumbprint'] = ( + 'source-build-thumbprint') + source_info = common.BuildInfo(source_info_dict, self.TEST_OEM_DICTS) + + script_writer = test_utils.MockScriptWriter() + WriteFingerprintAssertion(script_writer, target_info, source_info) + self.assertEqual( + [('AssertSomeThumbprint', 'build-thumbprint', + 'source-build-thumbprint')], + script_writer.lines) diff --git a/tools/releasetools/test_ota_from_target_files.py b/tools/releasetools/test_ota_from_target_files.py index 07b2e05e4c..52aa487031 100644 --- a/tools/releasetools/test_ota_from_target_files.py +++ b/tools/releasetools/test_ota_from_target_files.py @@ -21,14 +21,15 @@ import zipfile import common import test_utils +from ota_utils import CalculateRuntimeDevicesAndFingerprints from ota_from_target_files import ( _LoadOemDicts, AbOtaPropertyFiles, FinalizeMetadata, GetPackageMetadata, GetTargetFilesZipForSecondaryImages, - GetTargetFilesZipWithoutPostinstallConfig, NonAbOtaPropertyFiles, + GetTargetFilesZipWithoutPostinstallConfig, Payload, PayloadSigner, POSTINSTALL_CONFIG, PropertyFiles, - StreamingPropertyFiles, WriteFingerprintAssertion, - CalculateRuntimeDevicesAndFingerprints) - + StreamingPropertyFiles) +from non_ab_ota import NonAbOtaPropertyFiles +from test_utils import PropertyFilesTestCase def construct_target_files(secondary=False): """Returns a target-files.zip file for generating OTA packages.""" @@ -149,20 +150,6 @@ class OtaFromTargetFilesTest(test_utils.ReleaseToolsTestCase): 'oem_fingerprint_properties': 'ro.product.device ro.product.brand', } - TEST_OEM_DICTS = [ - { - 'ro.product.brand': 'brand1', - 'ro.product.device': 'device1', - }, - { - 'ro.product.brand': 'brand2', - 'ro.product.device': 'device2', - }, - { - 'ro.product.brand': 'brand3', - 'ro.product.device': 'device3', - }, - ] def setUp(self): self.testdata_dir = test_utils.get_testdata_dir() @@ -529,59 +516,6 @@ class OtaFromTargetFilesTest(test_utils.ReleaseToolsTestCase): FinalizeMetadata(metadata, zip_file, output_file, needed_property_files) self.assertIn('ota-test-property-files', metadata) - def test_WriteFingerprintAssertion_without_oem_props(self): - target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None) - source_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT) - source_info_dict['build.prop'].build_props['ro.build.fingerprint'] = ( - 'source-build-fingerprint') - source_info = common.BuildInfo(source_info_dict, None) - - script_writer = test_utils.MockScriptWriter() - WriteFingerprintAssertion(script_writer, target_info, source_info) - self.assertEqual( - [('AssertSomeFingerprint', 'source-build-fingerprint', - 'build-fingerprint-target')], - script_writer.lines) - - def test_WriteFingerprintAssertion_with_source_oem_props(self): - target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None) - source_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, - self.TEST_OEM_DICTS) - - script_writer = test_utils.MockScriptWriter() - WriteFingerprintAssertion(script_writer, target_info, source_info) - self.assertEqual( - [('AssertFingerprintOrThumbprint', 'build-fingerprint-target', - 'build-thumbprint')], - script_writer.lines) - - def test_WriteFingerprintAssertion_with_target_oem_props(self): - target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, - self.TEST_OEM_DICTS) - source_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None) - - script_writer = test_utils.MockScriptWriter() - WriteFingerprintAssertion(script_writer, target_info, source_info) - self.assertEqual( - [('AssertFingerprintOrThumbprint', 'build-fingerprint-target', - 'build-thumbprint')], - script_writer.lines) - - def test_WriteFingerprintAssertion_with_both_oem_props(self): - target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, - self.TEST_OEM_DICTS) - source_info_dict = copy.deepcopy(self.TEST_INFO_DICT_USES_OEM_PROPS) - source_info_dict['build.prop'].build_props['ro.build.thumbprint'] = ( - 'source-build-thumbprint') - source_info = common.BuildInfo(source_info_dict, self.TEST_OEM_DICTS) - - script_writer = test_utils.MockScriptWriter() - WriteFingerprintAssertion(script_writer, target_info, source_info) - self.assertEqual( - [('AssertSomeThumbprint', 'build-thumbprint', - 'source-build-thumbprint')], - script_writer.lines) - class TestPropertyFiles(PropertyFiles): """A class that extends PropertyFiles for testing purpose.""" @@ -598,41 +532,8 @@ class TestPropertyFiles(PropertyFiles): 'optional-entry2', ) +class PropertyFilesTest(PropertyFilesTestCase): -class PropertyFilesTest(test_utils.ReleaseToolsTestCase): - - def setUp(self): - common.OPTIONS.no_signing = False - - @staticmethod - def construct_zip_package(entries): - zip_file = common.MakeTempFile(suffix='.zip') - with zipfile.ZipFile(zip_file, 'w') as zip_fp: - for entry in entries: - zip_fp.writestr( - entry, - entry.replace('.', '-').upper(), - zipfile.ZIP_STORED) - return zip_file - - @staticmethod - def _parse_property_files_string(data): - result = {} - for token in data.split(','): - name, info = token.split(':', 1) - result[name] = info - return result - - def _verify_entries(self, input_file, tokens, entries): - for entry in entries: - offset, size = map(int, tokens[entry].split(':')) - with open(input_file, 'rb') as input_fp: - input_fp.seek(offset) - if entry == 'metadata': - expected = b'META-INF/COM/ANDROID/METADATA' - else: - expected = entry.replace('.', '-').upper().encode() - self.assertEqual(expected, input_fp.read(size)) @test_utils.SkipIfExternalToolsUnavailable() def test_Compute(self): @@ -753,7 +654,7 @@ class PropertyFilesTest(test_utils.ReleaseToolsTestCase): AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x') -class StreamingPropertyFilesTest(PropertyFilesTest): +class StreamingPropertyFilesTest(PropertyFilesTestCase): """Additional validity checks specialized for StreamingPropertyFiles.""" def test_init(self): @@ -834,7 +735,7 @@ class StreamingPropertyFilesTest(PropertyFilesTest): AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x') -class AbOtaPropertyFilesTest(PropertyFilesTest): +class AbOtaPropertyFilesTest(PropertyFilesTestCase): """Additional validity checks specialized for AbOtaPropertyFiles.""" # The size for payload and metadata signature size. @@ -1002,56 +903,6 @@ class AbOtaPropertyFilesTest(PropertyFilesTest): property_files.Verify(zip_fp, raw_metadata) -class NonAbOtaPropertyFilesTest(PropertyFilesTest): - """Additional validity checks specialized for NonAbOtaPropertyFiles.""" - - def test_init(self): - property_files = NonAbOtaPropertyFiles() - self.assertEqual('ota-property-files', property_files.name) - self.assertEqual((), property_files.required) - self.assertEqual((), property_files.optional) - - def test_Compute(self): - entries = () - zip_file = self.construct_zip_package(entries) - property_files = NonAbOtaPropertyFiles() - with zipfile.ZipFile(zip_file) as zip_fp: - property_files_string = property_files.Compute(zip_fp) - - tokens = self._parse_property_files_string(property_files_string) - self.assertEqual(1, len(tokens)) - self._verify_entries(zip_file, tokens, entries) - - def test_Finalize(self): - entries = [ - 'META-INF/com/android/metadata', - ] - zip_file = self.construct_zip_package(entries) - property_files = NonAbOtaPropertyFiles() - with zipfile.ZipFile(zip_file) as zip_fp: - raw_metadata = property_files.GetPropertyFilesString( - zip_fp, reserve_space=False) - property_files_string = property_files.Finalize(zip_fp, len(raw_metadata)) - tokens = self._parse_property_files_string(property_files_string) - - self.assertEqual(1, len(tokens)) - # 'META-INF/com/android/metadata' will be key'd as 'metadata'. - entries[0] = 'metadata' - self._verify_entries(zip_file, tokens, entries) - - def test_Verify(self): - entries = ( - 'META-INF/com/android/metadata', - ) - zip_file = self.construct_zip_package(entries) - property_files = NonAbOtaPropertyFiles() - with zipfile.ZipFile(zip_file) as zip_fp: - raw_metadata = property_files.GetPropertyFilesString( - zip_fp, reserve_space=False) - - property_files.Verify(zip_fp, raw_metadata) - - class PayloadSignerTest(test_utils.ReleaseToolsTestCase): SIGFILE = 'sigfile.bin' diff --git a/tools/releasetools/test_utils.py b/tools/releasetools/test_utils.py index e99975765b..65092d84de 100755 --- a/tools/releasetools/test_utils.py +++ b/tools/releasetools/test_utils.py @@ -25,6 +25,7 @@ import os.path import struct import sys import unittest +import zipfile import common @@ -192,6 +193,41 @@ class ReleaseToolsTestCase(unittest.TestCase): def tearDown(self): common.Cleanup() +class PropertyFilesTestCase(ReleaseToolsTestCase): + + @staticmethod + def construct_zip_package(entries): + zip_file = common.MakeTempFile(suffix='.zip') + with zipfile.ZipFile(zip_file, 'w') as zip_fp: + for entry in entries: + zip_fp.writestr( + entry, + entry.replace('.', '-').upper(), + zipfile.ZIP_STORED) + return zip_file + + @staticmethod + def _parse_property_files_string(data): + result = {} + for token in data.split(','): + name, info = token.split(':', 1) + result[name] = info + return result + + def setUp(self): + common.OPTIONS.no_signing = False + + def _verify_entries(self, input_file, tokens, entries): + for entry in entries: + offset, size = map(int, tokens[entry].split(':')) + with open(input_file, 'rb') as input_fp: + input_fp.seek(offset) + if entry == 'metadata': + expected = b'META-INF/COM/ANDROID/METADATA' + else: + expected = entry.replace('.', '-').upper().encode() + self.assertEqual(expected, input_fp.read(size)) + if __name__ == '__main__': testsuite = unittest.TestLoader().discover( |