aboutsummaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
authorKelvin Zhang <zhangkelvin@google.com>2020-07-29 16:37:51 -0400
committerKelvin Zhang <zhangkelvin@google.com>2020-08-10 16:22:22 -0400
commitcff4d7606dd195d8c2726867024b310a23a4ac8b (patch)
tree4926781b6ccd866d95776c7e202ee8314dba7b94 /tools
parent3443517fcbb245d567d7c87a53852ad4703dde08 (diff)
downloadplatform_build-cff4d7606dd195d8c2726867024b310a23a4ac8b.tar.gz
platform_build-cff4d7606dd195d8c2726867024b310a23a4ac8b.tar.bz2
platform_build-cff4d7606dd195d8c2726867024b310a23a4ac8b.zip
Move non-AB OTA generation code to a separate file
Test: Generate a non-AB OTA, apply it Change-Id: I2f1afbe70d17356fcbf4d59901d201a76a3d6c4f
Diffstat (limited to 'tools')
-rw-r--r--tools/releasetools/Android.bp2
-rwxr-xr-xtools/releasetools/check_target_files_vintf.py46
-rw-r--r--tools/releasetools/common.py2
-rw-r--r--tools/releasetools/non_ab_ota.py684
-rwxr-xr-xtools/releasetools/ota_from_target_files.py1127
-rw-r--r--tools/releasetools/ota_utils.py433
-rw-r--r--tools/releasetools/test_non_ab_ota.py169
-rw-r--r--tools/releasetools/test_ota_from_target_files.py165
-rwxr-xr-xtools/releasetools/test_utils.py36
9 files changed, 1384 insertions, 1280 deletions
diff --git a/tools/releasetools/Android.bp b/tools/releasetools/Android.bp
index 11f92abb17..0ca2b3791c 100644
--- a/tools/releasetools/Android.bp
+++ b/tools/releasetools/Android.bp
@@ -93,7 +93,9 @@ python_defaults {
srcs: [
"edify_generator.py",
"ota_from_target_files.py",
+ "non_ab_ota.py",
"target_files_diff.py",
+ "ota_utils.py",
],
libs: [
"releasetools_check_target_files_vintf",
diff --git a/tools/releasetools/check_target_files_vintf.py b/tools/releasetools/check_target_files_vintf.py
index ef66112355..0edefac9c1 100755
--- a/tools/releasetools/check_target_files_vintf.py
+++ b/tools/releasetools/check_target_files_vintf.py
@@ -220,6 +220,52 @@ def CheckVintf(inp, info_dict=None):
raise ValueError('{} is not a valid directory or zip file'.format(inp))
+def CheckVintfIfTrebleEnabled(target_files, target_info):
+ """Checks compatibility info of the input target files.
+
+ Metadata used for compatibility verification is retrieved from target_zip.
+
+ Compatibility should only be checked for devices that have enabled
+ Treble support.
+
+ Args:
+ target_files: Path to zip file containing the source files to be included
+ for OTA. Can also be the path to extracted directory.
+ target_info: The BuildInfo instance that holds the target build info.
+ """
+
+ # Will only proceed if the target has enabled the Treble support (as well as
+ # having a /vendor partition).
+ if not HasTrebleEnabled(target_files, target_info):
+ return
+
+ # Skip adding the compatibility package as a workaround for b/114240221. The
+ # compatibility will always fail on devices without qualified kernels.
+ if OPTIONS.skip_compatibility_check:
+ return
+
+ if not CheckVintf(target_files, target_info):
+ raise RuntimeError("VINTF compatibility check failed")
+
+def HasTrebleEnabled(target_files, target_info):
+ def HasVendorPartition(target_files):
+ if os.path.isdir(target_files):
+ return os.path.isdir(os.path.join(target_files, "VENDOR"))
+ if zipfile.is_zipfile(target_files):
+ return HasPartition(zipfile.ZipFile(target_files), "vendor")
+ raise ValueError("Unknown target_files argument")
+
+ return (HasVendorPartition(target_files) and
+ target_info.GetBuildProp("ro.treble.enabled") == "true")
+
+
+def HasPartition(target_files_zip, partition):
+ try:
+ target_files_zip.getinfo(partition.upper() + "/")
+ return True
+ except KeyError:
+ return False
+
def main(argv):
args = common.ParseOptions(argv, __doc__)
diff --git a/tools/releasetools/common.py b/tools/releasetools/common.py
index 1846a67de1..89900d3eea 100644
--- a/tools/releasetools/common.py
+++ b/tools/releasetools/common.py
@@ -1227,7 +1227,7 @@ def _MakeRamdisk(sourcedir, fs_config_file=None, lz4_ramdisks=False):
cmd = ["mkbootfs", os.path.join(sourcedir, "RAMDISK")]
p1 = Run(cmd, stdout=subprocess.PIPE)
if lz4_ramdisks:
- p2 = Run(["lz4", "-l", "-12" , "--favor-decSpeed"], stdin=p1.stdout,
+ p2 = Run(["lz4", "-l", "-12", "--favor-decSpeed"], stdin=p1.stdout,
stdout=ramdisk_img.file.fileno())
else:
p2 = Run(["minigzip"], stdin=p1.stdout, stdout=ramdisk_img.file.fileno())
diff --git a/tools/releasetools/non_ab_ota.py b/tools/releasetools/non_ab_ota.py
new file mode 100644
index 0000000000..3a8795798b
--- /dev/null
+++ b/tools/releasetools/non_ab_ota.py
@@ -0,0 +1,684 @@
+# Copyright (C) 2020 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import logging
+import os
+import zipfile
+
+import common
+import edify_generator
+import verity_utils
+from check_target_files_vintf import CheckVintfIfTrebleEnabled, HasPartition
+from common import OPTIONS
+from ota_utils import UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata, PropertyFiles
+
+logger = logging.getLogger(__name__)
+
+
+def GetBlockDifferences(target_zip, source_zip, target_info, source_info,
+ device_specific):
+ """Returns a ordered dict of block differences with partition name as key."""
+
+ def GetIncrementalBlockDifferenceForPartition(name):
+ if not HasPartition(source_zip, name):
+ raise RuntimeError(
+ "can't generate incremental that adds {}".format(name))
+
+ partition_src = common.GetUserImage(name, OPTIONS.source_tmp, source_zip,
+ info_dict=source_info,
+ allow_shared_blocks=allow_shared_blocks)
+
+ hashtree_info_generator = verity_utils.CreateHashtreeInfoGenerator(
+ name, 4096, target_info)
+ partition_tgt = common.GetUserImage(name, OPTIONS.target_tmp, target_zip,
+ info_dict=target_info,
+ allow_shared_blocks=allow_shared_blocks,
+ hashtree_info_generator=hashtree_info_generator)
+
+ # Check the first block of the source system partition for remount R/W only
+ # if the filesystem is ext4.
+ partition_source_info = source_info["fstab"]["/" + name]
+ check_first_block = partition_source_info.fs_type == "ext4"
+ # Disable using imgdiff for squashfs. 'imgdiff -z' expects input files to be
+ # in zip formats. However with squashfs, a) all files are compressed in LZ4;
+ # b) the blocks listed in block map may not contain all the bytes for a
+ # given file (because they're rounded to be 4K-aligned).
+ partition_target_info = target_info["fstab"]["/" + name]
+ disable_imgdiff = (partition_source_info.fs_type == "squashfs" or
+ partition_target_info.fs_type == "squashfs")
+ return common.BlockDifference(name, partition_tgt, partition_src,
+ check_first_block,
+ version=blockimgdiff_version,
+ disable_imgdiff=disable_imgdiff)
+
+ if source_zip:
+ # See notes in common.GetUserImage()
+ allow_shared_blocks = (source_info.get('ext4_share_dup_blocks') == "true" or
+ target_info.get('ext4_share_dup_blocks') == "true")
+ blockimgdiff_version = max(
+ int(i) for i in target_info.get(
+ "blockimgdiff_versions", "1").split(","))
+ assert blockimgdiff_version >= 3
+
+ block_diff_dict = collections.OrderedDict()
+ partition_names = ["system", "vendor", "product", "odm", "system_ext",
+ "vendor_dlkm", "odm_dlkm"]
+ for partition in partition_names:
+ if not HasPartition(target_zip, partition):
+ continue
+ # Full OTA update.
+ if not source_zip:
+ tgt = common.GetUserImage(partition, OPTIONS.input_tmp, target_zip,
+ info_dict=target_info,
+ reset_file_map=True)
+ block_diff_dict[partition] = common.BlockDifference(partition, tgt,
+ src=None)
+ # Incremental OTA update.
+ else:
+ block_diff_dict[partition] = GetIncrementalBlockDifferenceForPartition(
+ partition)
+ assert "system" in block_diff_dict
+
+ # Get the block diffs from the device specific script. If there is a
+ # duplicate block diff for a partition, ignore the diff in the generic script
+ # and use the one in the device specific script instead.
+ if source_zip:
+ device_specific_diffs = device_specific.IncrementalOTA_GetBlockDifferences()
+ function_name = "IncrementalOTA_GetBlockDifferences"
+ else:
+ device_specific_diffs = device_specific.FullOTA_GetBlockDifferences()
+ function_name = "FullOTA_GetBlockDifferences"
+
+ if device_specific_diffs:
+ assert all(isinstance(diff, common.BlockDifference)
+ for diff in device_specific_diffs), \
+ "{} is not returning a list of BlockDifference objects".format(
+ function_name)
+ for diff in device_specific_diffs:
+ if diff.partition in block_diff_dict:
+ logger.warning("Duplicate block difference found. Device specific block"
+ " diff for partition '%s' overrides the one in generic"
+ " script.", diff.partition)
+ block_diff_dict[diff.partition] = diff
+
+ return block_diff_dict
+
+
+def WriteFullOTAPackage(input_zip, output_file):
+ target_info = common.BuildInfo(OPTIONS.info_dict, OPTIONS.oem_dicts)
+
+ # We don't know what version it will be installed on top of. We expect the API
+ # just won't change very often. Similarly for fstab, it might have changed in
+ # the target build.
+ target_api_version = target_info["recovery_api_version"]
+ script = edify_generator.EdifyGenerator(target_api_version, target_info)
+
+ if target_info.oem_props and not OPTIONS.oem_no_mount:
+ target_info.WriteMountOemScript(script)
+
+ metadata = GetPackageMetadata(target_info)
+
+ if not OPTIONS.no_signing:
+ staging_file = common.MakeTempFile(suffix='.zip')
+ else:
+ staging_file = output_file
+
+ output_zip = zipfile.ZipFile(
+ staging_file, "w", compression=zipfile.ZIP_DEFLATED)
+
+ device_specific = common.DeviceSpecificParams(
+ input_zip=input_zip,
+ input_version=target_api_version,
+ output_zip=output_zip,
+ script=script,
+ input_tmp=OPTIONS.input_tmp,
+ metadata=metadata,
+ info_dict=OPTIONS.info_dict)
+
+ assert HasRecoveryPatch(input_zip, info_dict=OPTIONS.info_dict)
+
+ # Assertions (e.g. downgrade check, device properties check).
+ ts = target_info.GetBuildProp("ro.build.date.utc")
+ ts_text = target_info.GetBuildProp("ro.build.date")
+ script.AssertOlderBuild(ts, ts_text)
+
+ target_info.WriteDeviceAssertions(script, OPTIONS.oem_no_mount)
+ device_specific.FullOTA_Assertions()
+
+ block_diff_dict = GetBlockDifferences(target_zip=input_zip, source_zip=None,
+ target_info=target_info,
+ source_info=None,
+ device_specific=device_specific)
+
+ # Two-step package strategy (in chronological order, which is *not*
+ # the order in which the generated script has things):
+ #
+ # if stage is not "2/3" or "3/3":
+ # write recovery image to boot partition
+ # set stage to "2/3"
+ # reboot to boot partition and restart recovery
+ # else if stage is "2/3":
+ # write recovery image to recovery partition
+ # set stage to "3/3"
+ # reboot to recovery partition and restart recovery
+ # else:
+ # (stage must be "3/3")
+ # set stage to ""
+ # do normal full package installation:
+ # wipe and install system, boot image, etc.
+ # set up system to update recovery partition on first boot
+ # complete script normally
+ # (allow recovery to mark itself finished and reboot)
+
+ recovery_img = common.GetBootableImage("recovery.img", "recovery.img",
+ OPTIONS.input_tmp, "RECOVERY")
+ if OPTIONS.two_step:
+ if not target_info.get("multistage_support"):
+ assert False, "two-step packages not supported by this build"
+ fs = target_info["fstab"]["/misc"]
+ assert fs.fs_type.upper() == "EMMC", \
+ "two-step packages only supported on devices with EMMC /misc partitions"
+ bcb_dev = {"bcb_dev": fs.device}
+ common.ZipWriteStr(output_zip, "recovery.img", recovery_img.data)
+ script.AppendExtra("""
+if get_stage("%(bcb_dev)s") == "2/3" then
+""" % bcb_dev)
+
+ # Stage 2/3: Write recovery image to /recovery (currently running /boot).
+ script.Comment("Stage 2/3")
+ script.WriteRawImage("/recovery", "recovery.img")
+ script.AppendExtra("""
+set_stage("%(bcb_dev)s", "3/3");
+reboot_now("%(bcb_dev)s", "recovery");
+else if get_stage("%(bcb_dev)s") == "3/3" then
+""" % bcb_dev)
+
+ # Stage 3/3: Make changes.
+ script.Comment("Stage 3/3")
+
+ # Dump fingerprints
+ script.Print("Target: {}".format(target_info.fingerprint))
+
+ device_specific.FullOTA_InstallBegin()
+
+ # All other partitions as well as the data wipe use 10% of the progress, and
+ # the update of the system partition takes the remaining progress.
+ system_progress = 0.9 - (len(block_diff_dict) - 1) * 0.1
+ if OPTIONS.wipe_user_data:
+ system_progress -= 0.1
+ progress_dict = {partition: 0.1 for partition in block_diff_dict}
+ progress_dict["system"] = system_progress
+
+ if target_info.get('use_dynamic_partitions') == "true":
+ # Use empty source_info_dict to indicate that all partitions / groups must
+ # be re-added.
+ dynamic_partitions_diff = common.DynamicPartitionsDifference(
+ info_dict=OPTIONS.info_dict,
+ block_diffs=block_diff_dict.values(),
+ progress_dict=progress_dict)
+ dynamic_partitions_diff.WriteScript(script, output_zip,
+ write_verify_script=OPTIONS.verify)
+ else:
+ for block_diff in block_diff_dict.values():
+ block_diff.WriteScript(script, output_zip,
+ progress=progress_dict.get(block_diff.partition),
+ write_verify_script=OPTIONS.verify)
+
+ CheckVintfIfTrebleEnabled(OPTIONS.input_tmp, target_info)
+
+ boot_img = common.GetBootableImage(
+ "boot.img", "boot.img", OPTIONS.input_tmp, "BOOT")
+ common.CheckSize(boot_img.data, "boot.img", target_info)
+ common.ZipWriteStr(output_zip, "boot.img", boot_img.data)
+
+ script.WriteRawImage("/boot", "boot.img")
+
+ script.ShowProgress(0.1, 10)
+ device_specific.FullOTA_InstallEnd()
+
+ if OPTIONS.extra_script is not None:
+ script.AppendExtra(OPTIONS.extra_script)
+
+ script.UnmountAll()
+
+ if OPTIONS.wipe_user_data:
+ script.ShowProgress(0.1, 10)
+ script.FormatPartition("/data")
+
+ if OPTIONS.two_step:
+ script.AppendExtra("""
+set_stage("%(bcb_dev)s", "");
+""" % bcb_dev)
+ script.AppendExtra("else\n")
+
+ # Stage 1/3: Nothing to verify for full OTA. Write recovery image to /boot.
+ script.Comment("Stage 1/3")
+ _WriteRecoveryImageToBoot(script, output_zip)
+
+ script.AppendExtra("""
+set_stage("%(bcb_dev)s", "2/3");
+reboot_now("%(bcb_dev)s", "");
+endif;
+endif;
+""" % bcb_dev)
+
+ script.SetProgress(1)
+ script.AddToZip(input_zip, output_zip, input_path=OPTIONS.updater_binary)
+ metadata["ota-required-cache"] = str(script.required_cache)
+
+ # We haven't written the metadata entry, which will be done in
+ # FinalizeMetadata.
+ common.ZipClose(output_zip)
+
+ needed_property_files = (
+ NonAbOtaPropertyFiles(),
+ )
+ FinalizeMetadata(metadata, staging_file, output_file, needed_property_files)
+
+
+def WriteBlockIncrementalOTAPackage(target_zip, source_zip, output_file):
+ target_info = common.BuildInfo(OPTIONS.target_info_dict, OPTIONS.oem_dicts)
+ source_info = common.BuildInfo(OPTIONS.source_info_dict, OPTIONS.oem_dicts)
+
+ target_api_version = target_info["recovery_api_version"]
+ source_api_version = source_info["recovery_api_version"]
+ if source_api_version == 0:
+ logger.warning(
+ "Generating edify script for a source that can't install it.")
+
+ script = edify_generator.EdifyGenerator(
+ source_api_version, target_info, fstab=source_info["fstab"])
+
+ if target_info.oem_props or source_info.oem_props:
+ if not OPTIONS.oem_no_mount:
+ source_info.WriteMountOemScript(script)
+
+ metadata = GetPackageMetadata(target_info, source_info)
+
+ if not OPTIONS.no_signing:
+ staging_file = common.MakeTempFile(suffix='.zip')
+ else:
+ staging_file = output_file
+
+ output_zip = zipfile.ZipFile(
+ staging_file, "w", compression=zipfile.ZIP_DEFLATED)
+
+ device_specific = common.DeviceSpecificParams(
+ source_zip=source_zip,
+ source_version=source_api_version,
+ source_tmp=OPTIONS.source_tmp,
+ target_zip=target_zip,
+ target_version=target_api_version,
+ target_tmp=OPTIONS.target_tmp,
+ output_zip=output_zip,
+ script=script,
+ metadata=metadata,
+ info_dict=source_info)
+
+ source_boot = common.GetBootableImage(
+ "/tmp/boot.img", "boot.img", OPTIONS.source_tmp, "BOOT", source_info)
+ target_boot = common.GetBootableImage(
+ "/tmp/boot.img", "boot.img", OPTIONS.target_tmp, "BOOT", target_info)
+ updating_boot = (not OPTIONS.two_step and
+ (source_boot.data != target_boot.data))
+
+ target_recovery = common.GetBootableImage(
+ "/tmp/recovery.img", "recovery.img", OPTIONS.target_tmp, "RECOVERY")
+
+ block_diff_dict = GetBlockDifferences(target_zip=target_zip,
+ source_zip=source_zip,
+ target_info=target_info,
+ source_info=source_info,
+ device_specific=device_specific)
+
+ CheckVintfIfTrebleEnabled(OPTIONS.target_tmp, target_info)
+
+ # Assertions (e.g. device properties check).
+ target_info.WriteDeviceAssertions(script, OPTIONS.oem_no_mount)
+ device_specific.IncrementalOTA_Assertions()
+
+ # Two-step incremental package strategy (in chronological order,
+ # which is *not* the order in which the generated script has
+ # things):
+ #
+ # if stage is not "2/3" or "3/3":
+ # do verification on current system
+ # write recovery image to boot partition
+ # set stage to "2/3"
+ # reboot to boot partition and restart recovery
+ # else if stage is "2/3":
+ # write recovery image to recovery partition
+ # set stage to "3/3"
+ # reboot to recovery partition and restart recovery
+ # else:
+ # (stage must be "3/3")
+ # perform update:
+ # patch system files, etc.
+ # force full install of new boot image
+ # set up system to update recovery partition on first boot
+ # complete script normally
+ # (allow recovery to mark itself finished and reboot)
+
+ if OPTIONS.two_step:
+ if not source_info.get("multistage_support"):
+ assert False, "two-step packages not supported by this build"
+ fs = source_info["fstab"]["/misc"]
+ assert fs.fs_type.upper() == "EMMC", \
+ "two-step packages only supported on devices with EMMC /misc partitions"
+ bcb_dev = {"bcb_dev": fs.device}
+ common.ZipWriteStr(output_zip, "recovery.img", target_recovery.data)
+ script.AppendExtra("""
+if get_stage("%(bcb_dev)s") == "2/3" then
+""" % bcb_dev)
+
+ # Stage 2/3: Write recovery image to /recovery (currently running /boot).
+ script.Comment("Stage 2/3")
+ script.AppendExtra("sleep(20);\n")
+ script.WriteRawImage("/recovery", "recovery.img")
+ script.AppendExtra("""
+set_stage("%(bcb_dev)s", "3/3");
+reboot_now("%(bcb_dev)s", "recovery");
+else if get_stage("%(bcb_dev)s") != "3/3" then
+""" % bcb_dev)
+
+ # Stage 1/3: (a) Verify the current system.
+ script.Comment("Stage 1/3")
+
+ # Dump fingerprints
+ script.Print("Source: {}".format(source_info.fingerprint))
+ script.Print("Target: {}".format(target_info.fingerprint))
+
+ script.Print("Verifying current system...")
+
+ device_specific.IncrementalOTA_VerifyBegin()
+
+ WriteFingerprintAssertion(script, target_info, source_info)
+
+ # Check the required cache size (i.e. stashed blocks).
+ required_cache_sizes = [diff.required_cache for diff in
+ block_diff_dict.values()]
+ if updating_boot:
+ boot_type, boot_device_expr = common.GetTypeAndDeviceExpr("/boot",
+ source_info)
+ d = common.Difference(target_boot, source_boot)
+ _, _, d = d.ComputePatch()
+ if d is None:
+ include_full_boot = True
+ common.ZipWriteStr(output_zip, "boot.img", target_boot.data)
+ else:
+ include_full_boot = False
+
+ logger.info(
+ "boot target: %d source: %d diff: %d", target_boot.size,
+ source_boot.size, len(d))
+
+ common.ZipWriteStr(output_zip, "boot.img.p", d)
+
+ target_expr = 'concat("{}:",{},":{}:{}")'.format(
+ boot_type, boot_device_expr, target_boot.size, target_boot.sha1)
+ source_expr = 'concat("{}:",{},":{}:{}")'.format(
+ boot_type, boot_device_expr, source_boot.size, source_boot.sha1)
+ script.PatchPartitionExprCheck(target_expr, source_expr)
+
+ required_cache_sizes.append(target_boot.size)
+
+ if required_cache_sizes:
+ script.CacheFreeSpaceCheck(max(required_cache_sizes))
+
+ # Verify the existing partitions.
+ for diff in block_diff_dict.values():
+ diff.WriteVerifyScript(script, touched_blocks_only=True)
+
+ device_specific.IncrementalOTA_VerifyEnd()
+
+ if OPTIONS.two_step:
+ # Stage 1/3: (b) Write recovery image to /boot.
+ _WriteRecoveryImageToBoot(script, output_zip)
+
+ script.AppendExtra("""
+set_stage("%(bcb_dev)s", "2/3");
+reboot_now("%(bcb_dev)s", "");
+else
+""" % bcb_dev)
+
+ # Stage 3/3: Make changes.
+ script.Comment("Stage 3/3")
+
+ script.Comment("---- start making changes here ----")
+
+ device_specific.IncrementalOTA_InstallBegin()
+
+ progress_dict = {partition: 0.1 for partition in block_diff_dict}
+ progress_dict["system"] = 1 - len(block_diff_dict) * 0.1
+
+ if OPTIONS.source_info_dict.get("use_dynamic_partitions") == "true":
+ if OPTIONS.target_info_dict.get("use_dynamic_partitions") != "true":
+ raise RuntimeError(
+ "can't generate incremental that disables dynamic partitions")
+ dynamic_partitions_diff = common.DynamicPartitionsDifference(
+ info_dict=OPTIONS.target_info_dict,
+ source_info_dict=OPTIONS.source_info_dict,
+ block_diffs=block_diff_dict.values(),
+ progress_dict=progress_dict)
+ dynamic_partitions_diff.WriteScript(
+ script, output_zip, write_verify_script=OPTIONS.verify)
+ else:
+ for block_diff in block_diff_dict.values():
+ block_diff.WriteScript(script, output_zip,
+ progress=progress_dict.get(block_diff.partition),
+ write_verify_script=OPTIONS.verify)
+
+ if OPTIONS.two_step:
+ common.ZipWriteStr(output_zip, "boot.img", target_boot.data)
+ script.WriteRawImage("/boot", "boot.img")
+ logger.info("writing full boot image (forced by two-step mode)")
+
+ if not OPTIONS.two_step:
+ if updating_boot:
+ if include_full_boot:
+ logger.info("boot image changed; including full.")
+ script.Print("Installing boot image...")
+ script.WriteRawImage("/boot", "boot.img")
+ else:
+ # Produce the boot image by applying a patch to the current
+ # contents of the boot partition, and write it back to the
+ # partition.
+ logger.info("boot image changed; including patch.")
+ script.Print("Patching boot image...")
+ script.ShowProgress(0.1, 10)
+ target_expr = 'concat("{}:",{},":{}:{}")'.format(
+ boot_type, boot_device_expr, target_boot.size, target_boot.sha1)
+ source_expr = 'concat("{}:",{},":{}:{}")'.format(
+ boot_type, boot_device_expr, source_boot.size, source_boot.sha1)
+ script.PatchPartitionExpr(target_expr, source_expr, '"boot.img.p"')
+ else:
+ logger.info("boot image unchanged; skipping.")
+
+ # Do device-specific installation (eg, write radio image).
+ device_specific.IncrementalOTA_InstallEnd()
+
+ if OPTIONS.extra_script is not None:
+ script.AppendExtra(OPTIONS.extra_script)
+
+ if OPTIONS.wipe_user_data:
+ script.Print("Erasing user data...")
+ script.FormatPartition("/data")
+
+ if OPTIONS.two_step:
+ script.AppendExtra("""
+set_stage("%(bcb_dev)s", "");
+endif;
+endif;
+""" % bcb_dev)
+
+ script.SetProgress(1)
+ # For downgrade OTAs, we prefer to use the update-binary in the source
+ # build that is actually newer than the one in the target build.
+ if OPTIONS.downgrade:
+ script.AddToZip(source_zip, output_zip, input_path=OPTIONS.updater_binary)
+ else:
+ script.AddToZip(target_zip, output_zip, input_path=OPTIONS.updater_binary)
+ metadata["ota-required-cache"] = str(script.required_cache)
+
+ # We haven't written the metadata entry yet, which will be handled in
+ # FinalizeMetadata().
+ common.ZipClose(output_zip)
+
+ # Sign the generated zip package unless no_signing is specified.
+ needed_property_files = (
+ NonAbOtaPropertyFiles(),
+ )
+ FinalizeMetadata(metadata, staging_file, output_file, needed_property_files)
+
+
+def GenerateNonAbOtaPackage(target_file, output_file, source_file=None):
+ """Generates a non-A/B OTA package."""
+ # Check the loaded info dicts first.
+ if OPTIONS.info_dict.get("no_recovery") == "true":
+ raise common.ExternalError(
+ "--- target build has specified no recovery ---")
+
+ # Non-A/B OTAs rely on /cache partition to store temporary files.
+ cache_size = OPTIONS.info_dict.get("cache_size")
+ if cache_size is None:
+ logger.warning("--- can't determine the cache partition size ---")
+ OPTIONS.cache_size = cache_size
+
+ if OPTIONS.extra_script is not None:
+ with open(OPTIONS.extra_script) as fp:
+ OPTIONS.extra_script = fp.read()
+
+ if OPTIONS.extracted_input is not None:
+ OPTIONS.input_tmp = OPTIONS.extracted_input
+ else:
+ logger.info("unzipping target target-files...")
+ OPTIONS.input_tmp = common.UnzipTemp(target_file, UNZIP_PATTERN)
+ OPTIONS.target_tmp = OPTIONS.input_tmp
+
+ # If the caller explicitly specified the device-specific extensions path via
+ # -s / --device_specific, use that. Otherwise, use META/releasetools.py if it
+ # is present in the target target_files. Otherwise, take the path of the file
+ # from 'tool_extensions' in the info dict and look for that in the local
+ # filesystem, relative to the current directory.
+ if OPTIONS.device_specific is None:
+ from_input = os.path.join(OPTIONS.input_tmp, "META", "releasetools.py")
+ if os.path.exists(from_input):
+ logger.info("(using device-specific extensions from target_files)")
+ OPTIONS.device_specific = from_input
+ else:
+ OPTIONS.device_specific = OPTIONS.info_dict.get("tool_extensions")
+
+ if OPTIONS.device_specific is not None:
+ OPTIONS.device_specific = os.path.abspath(OPTIONS.device_specific)
+
+ # Generate a full OTA.
+ if source_file is None:
+ with zipfile.ZipFile(target_file) as input_zip:
+ WriteFullOTAPackage(
+ input_zip,
+ output_file)
+
+ # Generate an incremental OTA.
+ else:
+ logger.info("unzipping source target-files...")
+ OPTIONS.source_tmp = common.UnzipTemp(
+ OPTIONS.incremental_source, UNZIP_PATTERN)
+ with zipfile.ZipFile(target_file) as input_zip, \
+ zipfile.ZipFile(source_file) as source_zip:
+ WriteBlockIncrementalOTAPackage(
+ input_zip,
+ source_zip,
+ output_file)
+
+
+def WriteFingerprintAssertion(script, target_info, source_info):
+ source_oem_props = source_info.oem_props
+ target_oem_props = target_info.oem_props
+
+ if source_oem_props is None and target_oem_props is None:
+ script.AssertSomeFingerprint(
+ source_info.fingerprint, target_info.fingerprint)
+ elif source_oem_props is not None and target_oem_props is not None:
+ script.AssertSomeThumbprint(
+ target_info.GetBuildProp("ro.build.thumbprint"),
+ source_info.GetBuildProp("ro.build.thumbprint"))
+ elif source_oem_props is None and target_oem_props is not None:
+ script.AssertFingerprintOrThumbprint(
+ source_info.fingerprint,
+ target_info.GetBuildProp("ro.build.thumbprint"))
+ else:
+ script.AssertFingerprintOrThumbprint(
+ target_info.fingerprint,
+ source_info.GetBuildProp("ro.build.thumbprint"))
+
+
+class NonAbOtaPropertyFiles(PropertyFiles):
+ """The property-files for non-A/B OTA.
+
+ For non-A/B OTA, the property-files string contains the info for METADATA
+ entry, with which a system updater can be fetched the package metadata prior
+ to downloading the entire package.
+ """
+
+ def __init__(self):
+ super(NonAbOtaPropertyFiles, self).__init__()
+ self.name = 'ota-property-files'
+
+
+def _WriteRecoveryImageToBoot(script, output_zip):
+ """Find and write recovery image to /boot in two-step OTA.
+
+ In two-step OTAs, we write recovery image to /boot as the first step so that
+ we can reboot to there and install a new recovery image to /recovery.
+ A special "recovery-two-step.img" will be preferred, which encodes the correct
+ path of "/boot". Otherwise the device may show "device is corrupt" message
+ when booting into /boot.
+
+ Fall back to using the regular recovery.img if the two-step recovery image
+ doesn't exist. Note that rebuilding the special image at this point may be
+ infeasible, because we don't have the desired boot signer and keys when
+ calling ota_from_target_files.py.
+ """
+
+ recovery_two_step_img_name = "recovery-two-step.img"
+ recovery_two_step_img_path = os.path.join(
+ OPTIONS.input_tmp, "OTA", recovery_two_step_img_name)
+ if os.path.exists(recovery_two_step_img_path):
+ common.ZipWrite(
+ output_zip,
+ recovery_two_step_img_path,
+ arcname=recovery_two_step_img_name)
+ logger.info(
+ "two-step package: using %s in stage 1/3", recovery_two_step_img_name)
+ script.WriteRawImage("/boot", recovery_two_step_img_name)
+ else:
+ logger.info("two-step package: using recovery.img in stage 1/3")
+ # The "recovery.img" entry has been written into package earlier.
+ script.WriteRawImage("/boot", "recovery.img")
+
+
+def HasRecoveryPatch(target_files_zip, info_dict):
+ board_uses_vendorimage = info_dict.get("board_uses_vendorimage") == "true"
+
+ if board_uses_vendorimage:
+ target_files_dir = "VENDOR"
+ else:
+ target_files_dir = "SYSTEM/vendor"
+
+ patch = "%s/recovery-from-boot.p" % target_files_dir
+ img = "%s/etc/recovery.img" % target_files_dir
+
+ namelist = target_files_zip.namelist()
+ return patch in namelist or img in namelist
diff --git a/tools/releasetools/ota_from_target_files.py b/tools/releasetools/ota_from_target_files.py
index b70044e3f8..f42974f6a5 100755
--- a/tools/releasetools/ota_from_target_files.py
+++ b/tools/releasetools/ota_from_target_files.py
@@ -206,9 +206,6 @@ A/B OTA specific options
from __future__ import print_function
-import collections
-import copy
-import itertools
import logging
import multiprocessing
import os.path
@@ -218,12 +215,12 @@ import struct
import sys
import zipfile
-import check_target_files_vintf
import common
-import edify_generator
import target_files_diff
-import verity_utils
-
+from check_target_files_vintf import CheckVintfIfTrebleEnabled
+from non_ab_ota import GenerateNonAbOtaPackage
+from ota_utils import (UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata,
+ PropertyFiles)
if sys.hexversion < 0x02070000:
print("Python 2.7 or newer is required.", file=sys.stderr)
@@ -270,11 +267,10 @@ OPTIONS.force_non_ab = False
OPTIONS.boot_variable_file = None
-METADATA_NAME = 'META-INF/com/android/metadata'
POSTINSTALL_CONFIG = 'META/postinstall_config.txt'
DYNAMIC_PARTITION_INFO = 'META/dynamic_partitions_info.txt'
AB_PARTITIONS = 'META/ab_partitions.txt'
-UNZIP_PATTERN = ['IMAGES/*', 'META/*', 'OTA/*', 'RADIO/*']
+
# Files to be unzipped for target diffing purpose.
TARGET_DIFFING_UNZIP_PATTERN = ['BOOT', 'RECOVERY', 'SYSTEM/*', 'VENDOR/*',
'PRODUCT/*', 'SYSTEM_EXT/*', 'ODM/*',
@@ -488,13 +484,6 @@ class Payload(object):
compress_type=zipfile.ZIP_STORED)
-def SignOutput(temp_zip_name, output_zip_name):
- pw = OPTIONS.key_passwords[OPTIONS.package_key]
-
- common.SignFile(temp_zip_name, output_zip_name, OPTIONS.package_key, pw,
- whole_file=True)
-
-
def _LoadOemDicts(oem_source):
"""Returns the list of loaded OEM properties dict."""
if not oem_source:
@@ -507,658 +496,6 @@ def _LoadOemDicts(oem_source):
return oem_dicts
-def _WriteRecoveryImageToBoot(script, output_zip):
- """Find and write recovery image to /boot in two-step OTA.
-
- In two-step OTAs, we write recovery image to /boot as the first step so that
- we can reboot to there and install a new recovery image to /recovery.
- A special "recovery-two-step.img" will be preferred, which encodes the correct
- path of "/boot". Otherwise the device may show "device is corrupt" message
- when booting into /boot.
-
- Fall back to using the regular recovery.img if the two-step recovery image
- doesn't exist. Note that rebuilding the special image at this point may be
- infeasible, because we don't have the desired boot signer and keys when
- calling ota_from_target_files.py.
- """
-
- recovery_two_step_img_name = "recovery-two-step.img"
- recovery_two_step_img_path = os.path.join(
- OPTIONS.input_tmp, "OTA", recovery_two_step_img_name)
- if os.path.exists(recovery_two_step_img_path):
- common.ZipWrite(
- output_zip,
- recovery_two_step_img_path,
- arcname=recovery_two_step_img_name)
- logger.info(
- "two-step package: using %s in stage 1/3", recovery_two_step_img_name)
- script.WriteRawImage("/boot", recovery_two_step_img_name)
- else:
- logger.info("two-step package: using recovery.img in stage 1/3")
- # The "recovery.img" entry has been written into package earlier.
- script.WriteRawImage("/boot", "recovery.img")
-
-
-def HasRecoveryPatch(target_files_zip, info_dict):
- board_uses_vendorimage = info_dict.get("board_uses_vendorimage") == "true"
-
- if board_uses_vendorimage:
- target_files_dir = "VENDOR"
- else:
- target_files_dir = "SYSTEM/vendor"
-
- patch = "%s/recovery-from-boot.p" % target_files_dir
- img = "%s/etc/recovery.img" % target_files_dir
-
- namelist = target_files_zip.namelist()
- return patch in namelist or img in namelist
-
-
-def HasPartition(target_files_zip, partition):
- try:
- target_files_zip.getinfo(partition.upper() + "/")
- return True
- except KeyError:
- return False
-
-
-def HasTrebleEnabled(target_files, target_info):
- def HasVendorPartition(target_files):
- if os.path.isdir(target_files):
- return os.path.isdir(os.path.join(target_files, "VENDOR"))
- if zipfile.is_zipfile(target_files):
- return HasPartition(zipfile.ZipFile(target_files), "vendor")
- raise ValueError("Unknown target_files argument")
-
- return (HasVendorPartition(target_files) and
- target_info.GetBuildProp("ro.treble.enabled") == "true")
-
-
-def WriteFingerprintAssertion(script, target_info, source_info):
- source_oem_props = source_info.oem_props
- target_oem_props = target_info.oem_props
-
- if source_oem_props is None and target_oem_props is None:
- script.AssertSomeFingerprint(
- source_info.fingerprint, target_info.fingerprint)
- elif source_oem_props is not None and target_oem_props is not None:
- script.AssertSomeThumbprint(
- target_info.GetBuildProp("ro.build.thumbprint"),
- source_info.GetBuildProp("ro.build.thumbprint"))
- elif source_oem_props is None and target_oem_props is not None:
- script.AssertFingerprintOrThumbprint(
- source_info.fingerprint,
- target_info.GetBuildProp("ro.build.thumbprint"))
- else:
- script.AssertFingerprintOrThumbprint(
- target_info.fingerprint,
- source_info.GetBuildProp("ro.build.thumbprint"))
-
-
-def CheckVintfIfTrebleEnabled(target_files, target_info):
- """Checks compatibility info of the input target files.
-
- Metadata used for compatibility verification is retrieved from target_zip.
-
- Compatibility should only be checked for devices that have enabled
- Treble support.
-
- Args:
- target_files: Path to zip file containing the source files to be included
- for OTA. Can also be the path to extracted directory.
- target_info: The BuildInfo instance that holds the target build info.
- """
-
- # Will only proceed if the target has enabled the Treble support (as well as
- # having a /vendor partition).
- if not HasTrebleEnabled(target_files, target_info):
- return
-
- # Skip adding the compatibility package as a workaround for b/114240221. The
- # compatibility will always fail on devices without qualified kernels.
- if OPTIONS.skip_compatibility_check:
- return
-
- if not check_target_files_vintf.CheckVintf(target_files, target_info):
- raise RuntimeError("VINTF compatibility check failed")
-
-
-def GetBlockDifferences(target_zip, source_zip, target_info, source_info,
- device_specific):
- """Returns a ordered dict of block differences with partition name as key."""
-
- def GetIncrementalBlockDifferenceForPartition(name):
- if not HasPartition(source_zip, name):
- raise RuntimeError(
- "can't generate incremental that adds {}".format(name))
-
- partition_src = common.GetUserImage(name, OPTIONS.source_tmp, source_zip,
- info_dict=source_info,
- allow_shared_blocks=allow_shared_blocks)
-
- hashtree_info_generator = verity_utils.CreateHashtreeInfoGenerator(
- name, 4096, target_info)
- partition_tgt = common.GetUserImage(name, OPTIONS.target_tmp, target_zip,
- info_dict=target_info,
- allow_shared_blocks=allow_shared_blocks,
- hashtree_info_generator=hashtree_info_generator)
-
- # Check the first block of the source system partition for remount R/W only
- # if the filesystem is ext4.
- partition_source_info = source_info["fstab"]["/" + name]
- check_first_block = partition_source_info.fs_type == "ext4"
- # Disable using imgdiff for squashfs. 'imgdiff -z' expects input files to be
- # in zip formats. However with squashfs, a) all files are compressed in LZ4;
- # b) the blocks listed in block map may not contain all the bytes for a
- # given file (because they're rounded to be 4K-aligned).
- partition_target_info = target_info["fstab"]["/" + name]
- disable_imgdiff = (partition_source_info.fs_type == "squashfs" or
- partition_target_info.fs_type == "squashfs")
- return common.BlockDifference(name, partition_tgt, partition_src,
- check_first_block,
- version=blockimgdiff_version,
- disable_imgdiff=disable_imgdiff)
-
- if source_zip:
- # See notes in common.GetUserImage()
- allow_shared_blocks = (source_info.get('ext4_share_dup_blocks') == "true" or
- target_info.get('ext4_share_dup_blocks') == "true")
- blockimgdiff_version = max(
- int(i) for i in target_info.get(
- "blockimgdiff_versions", "1").split(","))
- assert blockimgdiff_version >= 3
-
- block_diff_dict = collections.OrderedDict()
- partition_names = ["system", "vendor", "product", "odm", "system_ext",
- "vendor_dlkm", "odm_dlkm"]
- for partition in partition_names:
- if not HasPartition(target_zip, partition):
- continue
- # Full OTA update.
- if not source_zip:
- tgt = common.GetUserImage(partition, OPTIONS.input_tmp, target_zip,
- info_dict=target_info,
- reset_file_map=True)
- block_diff_dict[partition] = common.BlockDifference(partition, tgt,
- src=None)
- # Incremental OTA update.
- else:
- block_diff_dict[partition] = GetIncrementalBlockDifferenceForPartition(
- partition)
- assert "system" in block_diff_dict
-
- # Get the block diffs from the device specific script. If there is a
- # duplicate block diff for a partition, ignore the diff in the generic script
- # and use the one in the device specific script instead.
- if source_zip:
- device_specific_diffs = device_specific.IncrementalOTA_GetBlockDifferences()
- function_name = "IncrementalOTA_GetBlockDifferences"
- else:
- device_specific_diffs = device_specific.FullOTA_GetBlockDifferences()
- function_name = "FullOTA_GetBlockDifferences"
-
- if device_specific_diffs:
- assert all(isinstance(diff, common.BlockDifference)
- for diff in device_specific_diffs), \
- "{} is not returning a list of BlockDifference objects".format(
- function_name)
- for diff in device_specific_diffs:
- if diff.partition in block_diff_dict:
- logger.warning("Duplicate block difference found. Device specific block"
- " diff for partition '%s' overrides the one in generic"
- " script.", diff.partition)
- block_diff_dict[diff.partition] = diff
-
- return block_diff_dict
-
-
-def WriteFullOTAPackage(input_zip, output_file):
- target_info = common.BuildInfo(OPTIONS.info_dict, OPTIONS.oem_dicts)
-
- # We don't know what version it will be installed on top of. We expect the API
- # just won't change very often. Similarly for fstab, it might have changed in
- # the target build.
- target_api_version = target_info["recovery_api_version"]
- script = edify_generator.EdifyGenerator(target_api_version, target_info)
-
- if target_info.oem_props and not OPTIONS.oem_no_mount:
- target_info.WriteMountOemScript(script)
-
- metadata = GetPackageMetadata(target_info)
-
- if not OPTIONS.no_signing:
- staging_file = common.MakeTempFile(suffix='.zip')
- else:
- staging_file = output_file
-
- output_zip = zipfile.ZipFile(
- staging_file, "w", compression=zipfile.ZIP_DEFLATED)
-
- device_specific = common.DeviceSpecificParams(
- input_zip=input_zip,
- input_version=target_api_version,
- output_zip=output_zip,
- script=script,
- input_tmp=OPTIONS.input_tmp,
- metadata=metadata,
- info_dict=OPTIONS.info_dict)
-
- assert HasRecoveryPatch(input_zip, info_dict=OPTIONS.info_dict)
-
- # Assertions (e.g. downgrade check, device properties check).
- ts = target_info.GetBuildProp("ro.build.date.utc")
- ts_text = target_info.GetBuildProp("ro.build.date")
- script.AssertOlderBuild(ts, ts_text)
-
- target_info.WriteDeviceAssertions(script, OPTIONS.oem_no_mount)
- device_specific.FullOTA_Assertions()
-
- block_diff_dict = GetBlockDifferences(target_zip=input_zip, source_zip=None,
- target_info=target_info,
- source_info=None,
- device_specific=device_specific)
-
- # Two-step package strategy (in chronological order, which is *not*
- # the order in which the generated script has things):
- #
- # if stage is not "2/3" or "3/3":
- # write recovery image to boot partition
- # set stage to "2/3"
- # reboot to boot partition and restart recovery
- # else if stage is "2/3":
- # write recovery image to recovery partition
- # set stage to "3/3"
- # reboot to recovery partition and restart recovery
- # else:
- # (stage must be "3/3")
- # set stage to ""
- # do normal full package installation:
- # wipe and install system, boot image, etc.
- # set up system to update recovery partition on first boot
- # complete script normally
- # (allow recovery to mark itself finished and reboot)
-
- recovery_img = common.GetBootableImage("recovery.img", "recovery.img",
- OPTIONS.input_tmp, "RECOVERY")
- if OPTIONS.two_step:
- if not target_info.get("multistage_support"):
- assert False, "two-step packages not supported by this build"
- fs = target_info["fstab"]["/misc"]
- assert fs.fs_type.upper() == "EMMC", \
- "two-step packages only supported on devices with EMMC /misc partitions"
- bcb_dev = {"bcb_dev": fs.device}
- common.ZipWriteStr(output_zip, "recovery.img", recovery_img.data)
- script.AppendExtra("""
-if get_stage("%(bcb_dev)s") == "2/3" then
-""" % bcb_dev)
-
- # Stage 2/3: Write recovery image to /recovery (currently running /boot).
- script.Comment("Stage 2/3")
- script.WriteRawImage("/recovery", "recovery.img")
- script.AppendExtra("""
-set_stage("%(bcb_dev)s", "3/3");
-reboot_now("%(bcb_dev)s", "recovery");
-else if get_stage("%(bcb_dev)s") == "3/3" then
-""" % bcb_dev)
-
- # Stage 3/3: Make changes.
- script.Comment("Stage 3/3")
-
- # Dump fingerprints
- script.Print("Target: {}".format(target_info.fingerprint))
-
- device_specific.FullOTA_InstallBegin()
-
- # All other partitions as well as the data wipe use 10% of the progress, and
- # the update of the system partition takes the remaining progress.
- system_progress = 0.9 - (len(block_diff_dict) - 1) * 0.1
- if OPTIONS.wipe_user_data:
- system_progress -= 0.1
- progress_dict = {partition: 0.1 for partition in block_diff_dict}
- progress_dict["system"] = system_progress
-
- if target_info.get('use_dynamic_partitions') == "true":
- # Use empty source_info_dict to indicate that all partitions / groups must
- # be re-added.
- dynamic_partitions_diff = common.DynamicPartitionsDifference(
- info_dict=OPTIONS.info_dict,
- block_diffs=block_diff_dict.values(),
- progress_dict=progress_dict)
- dynamic_partitions_diff.WriteScript(script, output_zip,
- write_verify_script=OPTIONS.verify)
- else:
- for block_diff in block_diff_dict.values():
- block_diff.WriteScript(script, output_zip,
- progress=progress_dict.get(block_diff.partition),
- write_verify_script=OPTIONS.verify)
-
- CheckVintfIfTrebleEnabled(OPTIONS.input_tmp, target_info)
-
- boot_img = common.GetBootableImage(
- "boot.img", "boot.img", OPTIONS.input_tmp, "BOOT")
- common.CheckSize(boot_img.data, "boot.img", target_info)
- common.ZipWriteStr(output_zip, "boot.img", boot_img.data)
-
- script.WriteRawImage("/boot", "boot.img")
-
- script.ShowProgress(0.1, 10)
- device_specific.FullOTA_InstallEnd()
-
- if OPTIONS.extra_script is not None:
- script.AppendExtra(OPTIONS.extra_script)
-
- script.UnmountAll()
-
- if OPTIONS.wipe_user_data:
- script.ShowProgress(0.1, 10)
- script.FormatPartition("/data")
-
- if OPTIONS.two_step:
- script.AppendExtra("""
-set_stage("%(bcb_dev)s", "");
-""" % bcb_dev)
- script.AppendExtra("else\n")
-
- # Stage 1/3: Nothing to verify for full OTA. Write recovery image to /boot.
- script.Comment("Stage 1/3")
- _WriteRecoveryImageToBoot(script, output_zip)
-
- script.AppendExtra("""
-set_stage("%(bcb_dev)s", "2/3");
-reboot_now("%(bcb_dev)s", "");
-endif;
-endif;
-""" % bcb_dev)
-
- script.SetProgress(1)
- script.AddToZip(input_zip, output_zip, input_path=OPTIONS.updater_binary)
- metadata["ota-required-cache"] = str(script.required_cache)
-
- # We haven't written the metadata entry, which will be done in
- # FinalizeMetadata.
- common.ZipClose(output_zip)
-
- needed_property_files = (
- NonAbOtaPropertyFiles(),
- )
- FinalizeMetadata(metadata, staging_file, output_file, needed_property_files)
-
-
-def WriteMetadata(metadata, output):
- """Writes the metadata to the zip archive or a file.
-
- Args:
- metadata: The metadata dict for the package.
- output: A ZipFile object or a string of the output file path.
- """
-
- value = "".join(["%s=%s\n" % kv for kv in sorted(metadata.items())])
- if isinstance(output, zipfile.ZipFile):
- common.ZipWriteStr(output, METADATA_NAME, value,
- compress_type=zipfile.ZIP_STORED)
- return
-
- with open(output, 'w') as f:
- f.write(value)
-
-
-def HandleDowngradeMetadata(metadata, target_info, source_info):
- # Only incremental OTAs are allowed to reach here.
- assert OPTIONS.incremental_source is not None
-
- post_timestamp = target_info.GetBuildProp("ro.build.date.utc")
- pre_timestamp = source_info.GetBuildProp("ro.build.date.utc")
- is_downgrade = int(post_timestamp) < int(pre_timestamp)
-
- if OPTIONS.downgrade:
- if not is_downgrade:
- raise RuntimeError(
- "--downgrade or --override_timestamp specified but no downgrade "
- "detected: pre: %s, post: %s" % (pre_timestamp, post_timestamp))
- metadata["ota-downgrade"] = "yes"
- else:
- if is_downgrade:
- raise RuntimeError(
- "Downgrade detected based on timestamp check: pre: %s, post: %s. "
- "Need to specify --override_timestamp OR --downgrade to allow "
- "building the incremental." % (pre_timestamp, post_timestamp))
-
-
-def GetPackageMetadata(target_info, source_info=None):
- """Generates and returns the metadata dict.
-
- It generates a dict() that contains the info to be written into an OTA
- package (META-INF/com/android/metadata). It also handles the detection of
- downgrade / data wipe based on the global options.
-
- Args:
- target_info: The BuildInfo instance that holds the target build info.
- source_info: The BuildInfo instance that holds the source build info, or
- None if generating full OTA.
-
- Returns:
- A dict to be written into package metadata entry.
- """
- assert isinstance(target_info, common.BuildInfo)
- assert source_info is None or isinstance(source_info, common.BuildInfo)
-
- separator = '|'
-
- boot_variable_values = {}
- if OPTIONS.boot_variable_file:
- d = common.LoadDictionaryFromFile(OPTIONS.boot_variable_file)
- for key, values in d.items():
- boot_variable_values[key] = [val.strip() for val in values.split(',')]
-
- post_build_devices, post_build_fingerprints = \
- CalculateRuntimeDevicesAndFingerprints(target_info, boot_variable_values)
- metadata = {
- 'post-build': separator.join(sorted(post_build_fingerprints)),
- 'post-build-incremental': target_info.GetBuildProp(
- 'ro.build.version.incremental'),
- 'post-sdk-level': target_info.GetBuildProp(
- 'ro.build.version.sdk'),
- 'post-security-patch-level': target_info.GetBuildProp(
- 'ro.build.version.security_patch'),
- }
-
- if target_info.is_ab and not OPTIONS.force_non_ab:
- metadata['ota-type'] = 'AB'
- metadata['ota-required-cache'] = '0'
- else:
- metadata['ota-type'] = 'BLOCK'
-
- if OPTIONS.wipe_user_data:
- metadata['ota-wipe'] = 'yes'
-
- if OPTIONS.retrofit_dynamic_partitions:
- metadata['ota-retrofit-dynamic-partitions'] = 'yes'
-
- is_incremental = source_info is not None
- if is_incremental:
- pre_build_devices, pre_build_fingerprints = \
- CalculateRuntimeDevicesAndFingerprints(source_info,
- boot_variable_values)
- metadata['pre-build'] = separator.join(sorted(pre_build_fingerprints))
- metadata['pre-build-incremental'] = source_info.GetBuildProp(
- 'ro.build.version.incremental')
- metadata['pre-device'] = separator.join(sorted(pre_build_devices))
- else:
- metadata['pre-device'] = separator.join(sorted(post_build_devices))
-
- # Use the actual post-timestamp, even for a downgrade case.
- metadata['post-timestamp'] = target_info.GetBuildProp('ro.build.date.utc')
-
- # Detect downgrades and set up downgrade flags accordingly.
- if is_incremental:
- HandleDowngradeMetadata(metadata, target_info, source_info)
-
- return metadata
-
-
-class PropertyFiles(object):
- """A class that computes the property-files string for an OTA package.
-
- A property-files string is a comma-separated string that contains the
- offset/size info for an OTA package. The entries, which must be ZIP_STORED,
- can be fetched directly with the package URL along with the offset/size info.
- These strings can be used for streaming A/B OTAs, or allowing an updater to
- download package metadata entry directly, without paying the cost of
- downloading entire package.
-
- Computing the final property-files string requires two passes. Because doing
- the whole package signing (with signapk.jar) will possibly reorder the ZIP
- entries, which may in turn invalidate earlier computed ZIP entry offset/size
- values.
-
- This class provides functions to be called for each pass. The general flow is
- as follows.
-
- property_files = PropertyFiles()
- # The first pass, which writes placeholders before doing initial signing.
- property_files.Compute()
- SignOutput()
-
- # The second pass, by replacing the placeholders with actual data.
- property_files.Finalize()
- SignOutput()
-
- And the caller can additionally verify the final result.
-
- property_files.Verify()
- """
-
- def __init__(self):
- self.name = None
- self.required = ()
- self.optional = ()
-
- def Compute(self, input_zip):
- """Computes and returns a property-files string with placeholders.
-
- We reserve extra space for the offset and size of the metadata entry itself,
- although we don't know the final values until the package gets signed.
-
- Args:
- input_zip: The input ZIP file.
-
- Returns:
- A string with placeholders for the metadata offset/size info, e.g.
- "payload.bin:679:343,payload_properties.txt:378:45,metadata: ".
- """
- return self.GetPropertyFilesString(input_zip, reserve_space=True)
-
- class InsufficientSpaceException(Exception):
- pass
-
- def Finalize(self, input_zip, reserved_length):
- """Finalizes a property-files string with actual METADATA offset/size info.
-
- The input ZIP file has been signed, with the ZIP entries in the desired
- place (signapk.jar will possibly reorder the ZIP entries). Now we compute
- the ZIP entry offsets and construct the property-files string with actual
- data. Note that during this process, we must pad the property-files string
- to the reserved length, so that the METADATA entry size remains the same.
- Otherwise the entries' offsets and sizes may change again.
-
- Args:
- input_zip: The input ZIP file.
- reserved_length: The reserved length of the property-files string during
- the call to Compute(). The final string must be no more than this
- size.
-
- Returns:
- A property-files string including the metadata offset/size info, e.g.
- "payload.bin:679:343,payload_properties.txt:378:45,metadata:69:379 ".
-
- Raises:
- InsufficientSpaceException: If the reserved length is insufficient to hold
- the final string.
- """
- result = self.GetPropertyFilesString(input_zip, reserve_space=False)
- if len(result) > reserved_length:
- raise self.InsufficientSpaceException(
- 'Insufficient reserved space: reserved={}, actual={}'.format(
- reserved_length, len(result)))
-
- result += ' ' * (reserved_length - len(result))
- return result
-
- def Verify(self, input_zip, expected):
- """Verifies the input ZIP file contains the expected property-files string.
-
- Args:
- input_zip: The input ZIP file.
- expected: The property-files string that's computed from Finalize().
-
- Raises:
- AssertionError: On finding a mismatch.
- """
- actual = self.GetPropertyFilesString(input_zip)
- assert actual == expected, \
- "Mismatching streaming metadata: {} vs {}.".format(actual, expected)
-
- def GetPropertyFilesString(self, zip_file, reserve_space=False):
- """
- Constructs the property-files string per request.
-
- Args:
- zip_file: The input ZIP file.
- reserved_length: The reserved length of the property-files string.
-
- Returns:
- A property-files string including the metadata offset/size info, e.g.
- "payload.bin:679:343,payload_properties.txt:378:45,metadata: ".
- """
-
- def ComputeEntryOffsetSize(name):
- """Computes the zip entry offset and size."""
- info = zip_file.getinfo(name)
- offset = info.header_offset
- offset += zipfile.sizeFileHeader
- offset += len(info.extra) + len(info.filename)
- size = info.file_size
- return '%s:%d:%d' % (os.path.basename(name), offset, size)
-
- tokens = []
- tokens.extend(self._GetPrecomputed(zip_file))
- for entry in self.required:
- tokens.append(ComputeEntryOffsetSize(entry))
- for entry in self.optional:
- if entry in zip_file.namelist():
- tokens.append(ComputeEntryOffsetSize(entry))
-
- # 'META-INF/com/android/metadata' is required. We don't know its actual
- # offset and length (as well as the values for other entries). So we reserve
- # 15-byte as a placeholder ('offset:length'), which is sufficient to cover
- # the space for metadata entry. Because 'offset' allows a max of 10-digit
- # (i.e. ~9 GiB), with a max of 4-digit for the length. Note that all the
- # reserved space serves the metadata entry only.
- if reserve_space:
- tokens.append('metadata:' + ' ' * 15)
- else:
- tokens.append(ComputeEntryOffsetSize(METADATA_NAME))
-
- return ','.join(tokens)
-
- def _GetPrecomputed(self, input_zip):
- """Computes the additional tokens to be included into the property-files.
-
- This applies to tokens without actual ZIP entries, such as
- payload_metadadata.bin. We want to expose the offset/size to updaters, so
- that they can download the payload metadata directly with the info.
-
- Args:
- input_zip: The input zip file.
-
- Returns:
- A list of strings (tokens) to be added to the property-files string.
- """
- # pylint: disable=no-self-use
- # pylint: disable=unused-argument
- return []
-
-
class StreamingPropertyFiles(PropertyFiles):
"""A subclass for computing the property-files for streaming A/B OTAs."""
@@ -1264,362 +601,6 @@ class AbOtaPropertyFiles(StreamingPropertyFiles):
return (payload_offset, metadata_total)
-class NonAbOtaPropertyFiles(PropertyFiles):
- """The property-files for non-A/B OTA.
-
- For non-A/B OTA, the property-files string contains the info for METADATA
- entry, with which a system updater can be fetched the package metadata prior
- to downloading the entire package.
- """
-
- def __init__(self):
- super(NonAbOtaPropertyFiles, self).__init__()
- self.name = 'ota-property-files'
-
-
-def FinalizeMetadata(metadata, input_file, output_file, needed_property_files):
- """Finalizes the metadata and signs an A/B OTA package.
-
- In order to stream an A/B OTA package, we need 'ota-streaming-property-files'
- that contains the offsets and sizes for the ZIP entries. An example
- property-files string is as follows.
-
- "payload.bin:679:343,payload_properties.txt:378:45,metadata:69:379"
-
- OTA server can pass down this string, in addition to the package URL, to the
- system update client. System update client can then fetch individual ZIP
- entries (ZIP_STORED) directly at the given offset of the URL.
-
- Args:
- metadata: The metadata dict for the package.
- input_file: The input ZIP filename that doesn't contain the package METADATA
- entry yet.
- output_file: The final output ZIP filename.
- needed_property_files: The list of PropertyFiles' to be generated.
- """
-
- def ComputeAllPropertyFiles(input_file, needed_property_files):
- # Write the current metadata entry with placeholders.
- with zipfile.ZipFile(input_file) as input_zip:
- for property_files in needed_property_files:
- metadata[property_files.name] = property_files.Compute(input_zip)
- namelist = input_zip.namelist()
-
- if METADATA_NAME in namelist:
- common.ZipDelete(input_file, METADATA_NAME)
- output_zip = zipfile.ZipFile(input_file, 'a')
- WriteMetadata(metadata, output_zip)
- common.ZipClose(output_zip)
-
- if OPTIONS.no_signing:
- return input_file
-
- prelim_signing = common.MakeTempFile(suffix='.zip')
- SignOutput(input_file, prelim_signing)
- return prelim_signing
-
- def FinalizeAllPropertyFiles(prelim_signing, needed_property_files):
- with zipfile.ZipFile(prelim_signing) as prelim_signing_zip:
- for property_files in needed_property_files:
- metadata[property_files.name] = property_files.Finalize(
- prelim_signing_zip, len(metadata[property_files.name]))
-
- # SignOutput(), which in turn calls signapk.jar, will possibly reorder the ZIP
- # entries, as well as padding the entry headers. We do a preliminary signing
- # (with an incomplete metadata entry) to allow that to happen. Then compute
- # the ZIP entry offsets, write back the final metadata and do the final
- # signing.
- prelim_signing = ComputeAllPropertyFiles(input_file, needed_property_files)
- try:
- FinalizeAllPropertyFiles(prelim_signing, needed_property_files)
- except PropertyFiles.InsufficientSpaceException:
- # Even with the preliminary signing, the entry orders may change
- # dramatically, which leads to insufficiently reserved space during the
- # first call to ComputeAllPropertyFiles(). In that case, we redo all the
- # preliminary signing works, based on the already ordered ZIP entries, to
- # address the issue.
- prelim_signing = ComputeAllPropertyFiles(
- prelim_signing, needed_property_files)
- FinalizeAllPropertyFiles(prelim_signing, needed_property_files)
-
- # Replace the METADATA entry.
- common.ZipDelete(prelim_signing, METADATA_NAME)
- output_zip = zipfile.ZipFile(prelim_signing, 'a')
- WriteMetadata(metadata, output_zip)
- common.ZipClose(output_zip)
-
- # Re-sign the package after updating the metadata entry.
- if OPTIONS.no_signing:
- output_file = prelim_signing
- else:
- SignOutput(prelim_signing, output_file)
-
- # Reopen the final signed zip to double check the streaming metadata.
- with zipfile.ZipFile(output_file) as output_zip:
- for property_files in needed_property_files:
- property_files.Verify(output_zip, metadata[property_files.name].strip())
-
- # If requested, dump the metadata to a separate file.
- output_metadata_path = OPTIONS.output_metadata_path
- if output_metadata_path:
- WriteMetadata(metadata, output_metadata_path)
-
-
-def WriteBlockIncrementalOTAPackage(target_zip, source_zip, output_file):
- target_info = common.BuildInfo(OPTIONS.target_info_dict, OPTIONS.oem_dicts)
- source_info = common.BuildInfo(OPTIONS.source_info_dict, OPTIONS.oem_dicts)
-
- target_api_version = target_info["recovery_api_version"]
- source_api_version = source_info["recovery_api_version"]
- if source_api_version == 0:
- logger.warning(
- "Generating edify script for a source that can't install it.")
-
- script = edify_generator.EdifyGenerator(
- source_api_version, target_info, fstab=source_info["fstab"])
-
- if target_info.oem_props or source_info.oem_props:
- if not OPTIONS.oem_no_mount:
- source_info.WriteMountOemScript(script)
-
- metadata = GetPackageMetadata(target_info, source_info)
-
- if not OPTIONS.no_signing:
- staging_file = common.MakeTempFile(suffix='.zip')
- else:
- staging_file = output_file
-
- output_zip = zipfile.ZipFile(
- staging_file, "w", compression=zipfile.ZIP_DEFLATED)
-
- device_specific = common.DeviceSpecificParams(
- source_zip=source_zip,
- source_version=source_api_version,
- source_tmp=OPTIONS.source_tmp,
- target_zip=target_zip,
- target_version=target_api_version,
- target_tmp=OPTIONS.target_tmp,
- output_zip=output_zip,
- script=script,
- metadata=metadata,
- info_dict=source_info)
-
- source_boot = common.GetBootableImage(
- "/tmp/boot.img", "boot.img", OPTIONS.source_tmp, "BOOT", source_info)
- target_boot = common.GetBootableImage(
- "/tmp/boot.img", "boot.img", OPTIONS.target_tmp, "BOOT", target_info)
- updating_boot = (not OPTIONS.two_step and
- (source_boot.data != target_boot.data))
-
- target_recovery = common.GetBootableImage(
- "/tmp/recovery.img", "recovery.img", OPTIONS.target_tmp, "RECOVERY")
-
- block_diff_dict = GetBlockDifferences(target_zip=target_zip,
- source_zip=source_zip,
- target_info=target_info,
- source_info=source_info,
- device_specific=device_specific)
-
- CheckVintfIfTrebleEnabled(OPTIONS.target_tmp, target_info)
-
- # Assertions (e.g. device properties check).
- target_info.WriteDeviceAssertions(script, OPTIONS.oem_no_mount)
- device_specific.IncrementalOTA_Assertions()
-
- # Two-step incremental package strategy (in chronological order,
- # which is *not* the order in which the generated script has
- # things):
- #
- # if stage is not "2/3" or "3/3":
- # do verification on current system
- # write recovery image to boot partition
- # set stage to "2/3"
- # reboot to boot partition and restart recovery
- # else if stage is "2/3":
- # write recovery image to recovery partition
- # set stage to "3/3"
- # reboot to recovery partition and restart recovery
- # else:
- # (stage must be "3/3")
- # perform update:
- # patch system files, etc.
- # force full install of new boot image
- # set up system to update recovery partition on first boot
- # complete script normally
- # (allow recovery to mark itself finished and reboot)
-
- if OPTIONS.two_step:
- if not source_info.get("multistage_support"):
- assert False, "two-step packages not supported by this build"
- fs = source_info["fstab"]["/misc"]
- assert fs.fs_type.upper() == "EMMC", \
- "two-step packages only supported on devices with EMMC /misc partitions"
- bcb_dev = {"bcb_dev": fs.device}
- common.ZipWriteStr(output_zip, "recovery.img", target_recovery.data)
- script.AppendExtra("""
-if get_stage("%(bcb_dev)s") == "2/3" then
-""" % bcb_dev)
-
- # Stage 2/3: Write recovery image to /recovery (currently running /boot).
- script.Comment("Stage 2/3")
- script.AppendExtra("sleep(20);\n")
- script.WriteRawImage("/recovery", "recovery.img")
- script.AppendExtra("""
-set_stage("%(bcb_dev)s", "3/3");
-reboot_now("%(bcb_dev)s", "recovery");
-else if get_stage("%(bcb_dev)s") != "3/3" then
-""" % bcb_dev)
-
- # Stage 1/3: (a) Verify the current system.
- script.Comment("Stage 1/3")
-
- # Dump fingerprints
- script.Print("Source: {}".format(source_info.fingerprint))
- script.Print("Target: {}".format(target_info.fingerprint))
-
- script.Print("Verifying current system...")
-
- device_specific.IncrementalOTA_VerifyBegin()
-
- WriteFingerprintAssertion(script, target_info, source_info)
-
- # Check the required cache size (i.e. stashed blocks).
- required_cache_sizes = [diff.required_cache for diff in
- block_diff_dict.values()]
- if updating_boot:
- boot_type, boot_device_expr = common.GetTypeAndDeviceExpr("/boot",
- source_info)
- d = common.Difference(target_boot, source_boot)
- _, _, d = d.ComputePatch()
- if d is None:
- include_full_boot = True
- common.ZipWriteStr(output_zip, "boot.img", target_boot.data)
- else:
- include_full_boot = False
-
- logger.info(
- "boot target: %d source: %d diff: %d", target_boot.size,
- source_boot.size, len(d))
-
- common.ZipWriteStr(output_zip, "boot.img.p", d)
-
- target_expr = 'concat("{}:",{},":{}:{}")'.format(
- boot_type, boot_device_expr, target_boot.size, target_boot.sha1)
- source_expr = 'concat("{}:",{},":{}:{}")'.format(
- boot_type, boot_device_expr, source_boot.size, source_boot.sha1)
- script.PatchPartitionExprCheck(target_expr, source_expr)
-
- required_cache_sizes.append(target_boot.size)
-
- if required_cache_sizes:
- script.CacheFreeSpaceCheck(max(required_cache_sizes))
-
- # Verify the existing partitions.
- for diff in block_diff_dict.values():
- diff.WriteVerifyScript(script, touched_blocks_only=True)
-
- device_specific.IncrementalOTA_VerifyEnd()
-
- if OPTIONS.two_step:
- # Stage 1/3: (b) Write recovery image to /boot.
- _WriteRecoveryImageToBoot(script, output_zip)
-
- script.AppendExtra("""
-set_stage("%(bcb_dev)s", "2/3");
-reboot_now("%(bcb_dev)s", "");
-else
-""" % bcb_dev)
-
- # Stage 3/3: Make changes.
- script.Comment("Stage 3/3")
-
- script.Comment("---- start making changes here ----")
-
- device_specific.IncrementalOTA_InstallBegin()
-
- progress_dict = {partition: 0.1 for partition in block_diff_dict}
- progress_dict["system"] = 1 - len(block_diff_dict) * 0.1
-
- if OPTIONS.source_info_dict.get("use_dynamic_partitions") == "true":
- if OPTIONS.target_info_dict.get("use_dynamic_partitions") != "true":
- raise RuntimeError(
- "can't generate incremental that disables dynamic partitions")
- dynamic_partitions_diff = common.DynamicPartitionsDifference(
- info_dict=OPTIONS.target_info_dict,
- source_info_dict=OPTIONS.source_info_dict,
- block_diffs=block_diff_dict.values(),
- progress_dict=progress_dict)
- dynamic_partitions_diff.WriteScript(
- script, output_zip, write_verify_script=OPTIONS.verify)
- else:
- for block_diff in block_diff_dict.values():
- block_diff.WriteScript(script, output_zip,
- progress=progress_dict.get(block_diff.partition),
- write_verify_script=OPTIONS.verify)
-
- if OPTIONS.two_step:
- common.ZipWriteStr(output_zip, "boot.img", target_boot.data)
- script.WriteRawImage("/boot", "boot.img")
- logger.info("writing full boot image (forced by two-step mode)")
-
- if not OPTIONS.two_step:
- if updating_boot:
- if include_full_boot:
- logger.info("boot image changed; including full.")
- script.Print("Installing boot image...")
- script.WriteRawImage("/boot", "boot.img")
- else:
- # Produce the boot image by applying a patch to the current
- # contents of the boot partition, and write it back to the
- # partition.
- logger.info("boot image changed; including patch.")
- script.Print("Patching boot image...")
- script.ShowProgress(0.1, 10)
- target_expr = 'concat("{}:",{},":{}:{}")'.format(
- boot_type, boot_device_expr, target_boot.size, target_boot.sha1)
- source_expr = 'concat("{}:",{},":{}:{}")'.format(
- boot_type, boot_device_expr, source_boot.size, source_boot.sha1)
- script.PatchPartitionExpr(target_expr, source_expr, '"boot.img.p"')
- else:
- logger.info("boot image unchanged; skipping.")
-
- # Do device-specific installation (eg, write radio image).
- device_specific.IncrementalOTA_InstallEnd()
-
- if OPTIONS.extra_script is not None:
- script.AppendExtra(OPTIONS.extra_script)
-
- if OPTIONS.wipe_user_data:
- script.Print("Erasing user data...")
- script.FormatPartition("/data")
-
- if OPTIONS.two_step:
- script.AppendExtra("""
-set_stage("%(bcb_dev)s", "");
-endif;
-endif;
-""" % bcb_dev)
-
- script.SetProgress(1)
- # For downgrade OTAs, we prefer to use the update-binary in the source
- # build that is actually newer than the one in the target build.
- if OPTIONS.downgrade:
- script.AddToZip(source_zip, output_zip, input_path=OPTIONS.updater_binary)
- else:
- script.AddToZip(target_zip, output_zip, input_path=OPTIONS.updater_binary)
- metadata["ota-required-cache"] = str(script.required_cache)
-
- # We haven't written the metadata entry yet, which will be handled in
- # FinalizeMetadata().
- common.ZipClose(output_zip)
-
- # Sign the generated zip package unless no_signing is specified.
- needed_property_files = (
- NonAbOtaPropertyFiles(),
- )
- FinalizeMetadata(metadata, staging_file, output_file, needed_property_files)
-
-
def GetTargetFilesZipForSecondaryImages(input_file, skip_postinstall=False):
"""Returns a target-files.zip file for generating secondary payload.
@@ -1938,104 +919,6 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None):
FinalizeMetadata(metadata, staging_file, output_file, needed_property_files)
-def GenerateNonAbOtaPackage(target_file, output_file, source_file=None):
- """Generates a non-A/B OTA package."""
- # Check the loaded info dicts first.
- if OPTIONS.info_dict.get("no_recovery") == "true":
- raise common.ExternalError(
- "--- target build has specified no recovery ---")
-
- # Non-A/B OTAs rely on /cache partition to store temporary files.
- cache_size = OPTIONS.info_dict.get("cache_size")
- if cache_size is None:
- logger.warning("--- can't determine the cache partition size ---")
- OPTIONS.cache_size = cache_size
-
- if OPTIONS.extra_script is not None:
- with open(OPTIONS.extra_script) as fp:
- OPTIONS.extra_script = fp.read()
-
- if OPTIONS.extracted_input is not None:
- OPTIONS.input_tmp = OPTIONS.extracted_input
- else:
- logger.info("unzipping target target-files...")
- OPTIONS.input_tmp = common.UnzipTemp(target_file, UNZIP_PATTERN)
- OPTIONS.target_tmp = OPTIONS.input_tmp
-
- # If the caller explicitly specified the device-specific extensions path via
- # -s / --device_specific, use that. Otherwise, use META/releasetools.py if it
- # is present in the target target_files. Otherwise, take the path of the file
- # from 'tool_extensions' in the info dict and look for that in the local
- # filesystem, relative to the current directory.
- if OPTIONS.device_specific is None:
- from_input = os.path.join(OPTIONS.input_tmp, "META", "releasetools.py")
- if os.path.exists(from_input):
- logger.info("(using device-specific extensions from target_files)")
- OPTIONS.device_specific = from_input
- else:
- OPTIONS.device_specific = OPTIONS.info_dict.get("tool_extensions")
-
- if OPTIONS.device_specific is not None:
- OPTIONS.device_specific = os.path.abspath(OPTIONS.device_specific)
-
- # Generate a full OTA.
- if source_file is None:
- with zipfile.ZipFile(target_file) as input_zip:
- WriteFullOTAPackage(
- input_zip,
- output_file)
-
- # Generate an incremental OTA.
- else:
- logger.info("unzipping source target-files...")
- OPTIONS.source_tmp = common.UnzipTemp(
- OPTIONS.incremental_source, UNZIP_PATTERN)
- with zipfile.ZipFile(target_file) as input_zip, \
- zipfile.ZipFile(source_file) as source_zip:
- WriteBlockIncrementalOTAPackage(
- input_zip,
- source_zip,
- output_file)
-
-
-def CalculateRuntimeDevicesAndFingerprints(build_info, boot_variable_values):
- """Returns a tuple of sets for runtime devices and fingerprints"""
-
- device_names = {build_info.device}
- fingerprints = {build_info.fingerprint}
-
- if not boot_variable_values:
- return device_names, fingerprints
-
- # Calculate all possible combinations of the values for the boot variables.
- keys = boot_variable_values.keys()
- value_list = boot_variable_values.values()
- combinations = [dict(zip(keys, values))
- for values in itertools.product(*value_list)]
- for placeholder_values in combinations:
- # Reload the info_dict as some build properties may change their values
- # based on the value of ro.boot* properties.
- info_dict = copy.deepcopy(build_info.info_dict)
- for partition in common.PARTITIONS_WITH_CARE_MAP:
- partition_prop_key = "{}.build.prop".format(partition)
- input_file = info_dict[partition_prop_key].input_file
- if isinstance(input_file, zipfile.ZipFile):
- with zipfile.ZipFile(input_file.filename) as input_zip:
- info_dict[partition_prop_key] = \
- common.PartitionBuildProps.FromInputFile(input_zip, partition,
- placeholder_values)
- else:
- info_dict[partition_prop_key] = \
- common.PartitionBuildProps.FromInputFile(input_file, partition,
- placeholder_values)
- info_dict["build.prop"] = info_dict["system.build.prop"]
-
- new_build_info = common.BuildInfo(info_dict, build_info.oem_dicts)
- device_names.add(new_build_info.device)
- fingerprints.add(new_build_info.fingerprint)
- return device_names, fingerprints
-
-
def main(argv):
def option_handler(o, a):
diff --git a/tools/releasetools/ota_utils.py b/tools/releasetools/ota_utils.py
new file mode 100644
index 0000000000..874ab951c1
--- /dev/null
+++ b/tools/releasetools/ota_utils.py
@@ -0,0 +1,433 @@
+# Copyright (C) 2020 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import itertools
+import os
+import zipfile
+
+from common import (ZipDelete, ZipClose, OPTIONS, MakeTempFile,
+ ZipWriteStr, BuildInfo, LoadDictionaryFromFile,
+ SignFile, PARTITIONS_WITH_CARE_MAP, PartitionBuildProps)
+
+METADATA_NAME = 'META-INF/com/android/metadata'
+UNZIP_PATTERN = ['IMAGES/*', 'META/*', 'OTA/*', 'RADIO/*']
+
+
+def FinalizeMetadata(metadata, input_file, output_file, needed_property_files):
+ """Finalizes the metadata and signs an A/B OTA package.
+
+ In order to stream an A/B OTA package, we need 'ota-streaming-property-files'
+ that contains the offsets and sizes for the ZIP entries. An example
+ property-files string is as follows.
+
+ "payload.bin:679:343,payload_properties.txt:378:45,metadata:69:379"
+
+ OTA server can pass down this string, in addition to the package URL, to the
+ system update client. System update client can then fetch individual ZIP
+ entries (ZIP_STORED) directly at the given offset of the URL.
+
+ Args:
+ metadata: The metadata dict for the package.
+ input_file: The input ZIP filename that doesn't contain the package METADATA
+ entry yet.
+ output_file: The final output ZIP filename.
+ needed_property_files: The list of PropertyFiles' to be generated.
+ """
+
+ def ComputeAllPropertyFiles(input_file, needed_property_files):
+ # Write the current metadata entry with placeholders.
+ with zipfile.ZipFile(input_file) as input_zip:
+ for property_files in needed_property_files:
+ metadata[property_files.name] = property_files.Compute(input_zip)
+ namelist = input_zip.namelist()
+
+ if METADATA_NAME in namelist:
+ ZipDelete(input_file, METADATA_NAME)
+ output_zip = zipfile.ZipFile(input_file, 'a')
+ WriteMetadata(metadata, output_zip)
+ ZipClose(output_zip)
+
+ if OPTIONS.no_signing:
+ return input_file
+
+ prelim_signing = MakeTempFile(suffix='.zip')
+ SignOutput(input_file, prelim_signing)
+ return prelim_signing
+
+ def FinalizeAllPropertyFiles(prelim_signing, needed_property_files):
+ with zipfile.ZipFile(prelim_signing) as prelim_signing_zip:
+ for property_files in needed_property_files:
+ metadata[property_files.name] = property_files.Finalize(
+ prelim_signing_zip, len(metadata[property_files.name]))
+
+ # SignOutput(), which in turn calls signapk.jar, will possibly reorder the ZIP
+ # entries, as well as padding the entry headers. We do a preliminary signing
+ # (with an incomplete metadata entry) to allow that to happen. Then compute
+ # the ZIP entry offsets, write back the final metadata and do the final
+ # signing.
+ prelim_signing = ComputeAllPropertyFiles(input_file, needed_property_files)
+ try:
+ FinalizeAllPropertyFiles(prelim_signing, needed_property_files)
+ except PropertyFiles.InsufficientSpaceException:
+ # Even with the preliminary signing, the entry orders may change
+ # dramatically, which leads to insufficiently reserved space during the
+ # first call to ComputeAllPropertyFiles(). In that case, we redo all the
+ # preliminary signing works, based on the already ordered ZIP entries, to
+ # address the issue.
+ prelim_signing = ComputeAllPropertyFiles(
+ prelim_signing, needed_property_files)
+ FinalizeAllPropertyFiles(prelim_signing, needed_property_files)
+
+ # Replace the METADATA entry.
+ ZipDelete(prelim_signing, METADATA_NAME)
+ output_zip = zipfile.ZipFile(prelim_signing, 'a')
+ WriteMetadata(metadata, output_zip)
+ ZipClose(output_zip)
+
+ # Re-sign the package after updating the metadata entry.
+ if OPTIONS.no_signing:
+ output_file = prelim_signing
+ else:
+ SignOutput(prelim_signing, output_file)
+
+ # Reopen the final signed zip to double check the streaming metadata.
+ with zipfile.ZipFile(output_file) as output_zip:
+ for property_files in needed_property_files:
+ property_files.Verify(output_zip, metadata[property_files.name].strip())
+
+ # If requested, dump the metadata to a separate file.
+ output_metadata_path = OPTIONS.output_metadata_path
+ if output_metadata_path:
+ WriteMetadata(metadata, output_metadata_path)
+
+
+def WriteMetadata(metadata, output):
+ """Writes the metadata to the zip archive or a file.
+
+ Args:
+ metadata: The metadata dict for the package.
+ output: A ZipFile object or a string of the output file path.
+ """
+
+ value = "".join(["%s=%s\n" % kv for kv in sorted(metadata.items())])
+ if isinstance(output, zipfile.ZipFile):
+ ZipWriteStr(output, METADATA_NAME, value,
+ compress_type=zipfile.ZIP_STORED)
+ return
+
+ with open(output, 'w') as f:
+ f.write(value)
+
+
+def GetPackageMetadata(target_info, source_info=None):
+ """Generates and returns the metadata dict.
+
+ It generates a dict() that contains the info to be written into an OTA
+ package (META-INF/com/android/metadata). It also handles the detection of
+ downgrade / data wipe based on the global options.
+
+ Args:
+ target_info: The BuildInfo instance that holds the target build info.
+ source_info: The BuildInfo instance that holds the source build info, or
+ None if generating full OTA.
+
+ Returns:
+ A dict to be written into package metadata entry.
+ """
+ assert isinstance(target_info, BuildInfo)
+ assert source_info is None or isinstance(source_info, BuildInfo)
+
+ separator = '|'
+
+ boot_variable_values = {}
+ if OPTIONS.boot_variable_file:
+ d = LoadDictionaryFromFile(OPTIONS.boot_variable_file)
+ for key, values in d.items():
+ boot_variable_values[key] = [val.strip() for val in values.split(',')]
+
+ post_build_devices, post_build_fingerprints = \
+ CalculateRuntimeDevicesAndFingerprints(target_info, boot_variable_values)
+ metadata = {
+ 'post-build': separator.join(sorted(post_build_fingerprints)),
+ 'post-build-incremental': target_info.GetBuildProp(
+ 'ro.build.version.incremental'),
+ 'post-sdk-level': target_info.GetBuildProp(
+ 'ro.build.version.sdk'),
+ 'post-security-patch-level': target_info.GetBuildProp(
+ 'ro.build.version.security_patch'),
+ }
+
+ if target_info.is_ab and not OPTIONS.force_non_ab:
+ metadata['ota-type'] = 'AB'
+ metadata['ota-required-cache'] = '0'
+ else:
+ metadata['ota-type'] = 'BLOCK'
+
+ if OPTIONS.wipe_user_data:
+ metadata['ota-wipe'] = 'yes'
+
+ if OPTIONS.retrofit_dynamic_partitions:
+ metadata['ota-retrofit-dynamic-partitions'] = 'yes'
+
+ is_incremental = source_info is not None
+ if is_incremental:
+ pre_build_devices, pre_build_fingerprints = \
+ CalculateRuntimeDevicesAndFingerprints(source_info,
+ boot_variable_values)
+ metadata['pre-build'] = separator.join(sorted(pre_build_fingerprints))
+ metadata['pre-build-incremental'] = source_info.GetBuildProp(
+ 'ro.build.version.incremental')
+ metadata['pre-device'] = separator.join(sorted(pre_build_devices))
+ else:
+ metadata['pre-device'] = separator.join(sorted(post_build_devices))
+
+ # Use the actual post-timestamp, even for a downgrade case.
+ metadata['post-timestamp'] = target_info.GetBuildProp('ro.build.date.utc')
+
+ # Detect downgrades and set up downgrade flags accordingly.
+ if is_incremental:
+ HandleDowngradeMetadata(metadata, target_info, source_info)
+
+ return metadata
+
+
+def HandleDowngradeMetadata(metadata, target_info, source_info):
+ # Only incremental OTAs are allowed to reach here.
+ assert OPTIONS.incremental_source is not None
+
+ post_timestamp = target_info.GetBuildProp("ro.build.date.utc")
+ pre_timestamp = source_info.GetBuildProp("ro.build.date.utc")
+ is_downgrade = int(post_timestamp) < int(pre_timestamp)
+
+ if OPTIONS.downgrade:
+ if not is_downgrade:
+ raise RuntimeError(
+ "--downgrade or --override_timestamp specified but no downgrade "
+ "detected: pre: %s, post: %s" % (pre_timestamp, post_timestamp))
+ metadata["ota-downgrade"] = "yes"
+ else:
+ if is_downgrade:
+ raise RuntimeError(
+ "Downgrade detected based on timestamp check: pre: %s, post: %s. "
+ "Need to specify --override_timestamp OR --downgrade to allow "
+ "building the incremental." % (pre_timestamp, post_timestamp))
+
+
+def CalculateRuntimeDevicesAndFingerprints(build_info, boot_variable_values):
+ """Returns a tuple of sets for runtime devices and fingerprints"""
+
+ device_names = {build_info.device}
+ fingerprints = {build_info.fingerprint}
+
+ if not boot_variable_values:
+ return device_names, fingerprints
+
+ # Calculate all possible combinations of the values for the boot variables.
+ keys = boot_variable_values.keys()
+ value_list = boot_variable_values.values()
+ combinations = [dict(zip(keys, values))
+ for values in itertools.product(*value_list)]
+ for placeholder_values in combinations:
+ # Reload the info_dict as some build properties may change their values
+ # based on the value of ro.boot* properties.
+ info_dict = copy.deepcopy(build_info.info_dict)
+ for partition in PARTITIONS_WITH_CARE_MAP:
+ partition_prop_key = "{}.build.prop".format(partition)
+ input_file = info_dict[partition_prop_key].input_file
+ if isinstance(input_file, zipfile.ZipFile):
+ with zipfile.ZipFile(input_file.filename) as input_zip:
+ info_dict[partition_prop_key] = \
+ PartitionBuildProps.FromInputFile(input_zip, partition,
+ placeholder_values)
+ else:
+ info_dict[partition_prop_key] = \
+ PartitionBuildProps.FromInputFile(input_file, partition,
+ placeholder_values)
+ info_dict["build.prop"] = info_dict["system.build.prop"]
+
+ new_build_info = BuildInfo(info_dict, build_info.oem_dicts)
+ device_names.add(new_build_info.device)
+ fingerprints.add(new_build_info.fingerprint)
+ return device_names, fingerprints
+
+
+class PropertyFiles(object):
+ """A class that computes the property-files string for an OTA package.
+
+ A property-files string is a comma-separated string that contains the
+ offset/size info for an OTA package. The entries, which must be ZIP_STORED,
+ can be fetched directly with the package URL along with the offset/size info.
+ These strings can be used for streaming A/B OTAs, or allowing an updater to
+ download package metadata entry directly, without paying the cost of
+ downloading entire package.
+
+ Computing the final property-files string requires two passes. Because doing
+ the whole package signing (with signapk.jar) will possibly reorder the ZIP
+ entries, which may in turn invalidate earlier computed ZIP entry offset/size
+ values.
+
+ This class provides functions to be called for each pass. The general flow is
+ as follows.
+
+ property_files = PropertyFiles()
+ # The first pass, which writes placeholders before doing initial signing.
+ property_files.Compute()
+ SignOutput()
+
+ # The second pass, by replacing the placeholders with actual data.
+ property_files.Finalize()
+ SignOutput()
+
+ And the caller can additionally verify the final result.
+
+ property_files.Verify()
+ """
+
+ def __init__(self):
+ self.name = None
+ self.required = ()
+ self.optional = ()
+
+ def Compute(self, input_zip):
+ """Computes and returns a property-files string with placeholders.
+
+ We reserve extra space for the offset and size of the metadata entry itself,
+ although we don't know the final values until the package gets signed.
+
+ Args:
+ input_zip: The input ZIP file.
+
+ Returns:
+ A string with placeholders for the metadata offset/size info, e.g.
+ "payload.bin:679:343,payload_properties.txt:378:45,metadata: ".
+ """
+ return self.GetPropertyFilesString(input_zip, reserve_space=True)
+
+ class InsufficientSpaceException(Exception):
+ pass
+
+ def Finalize(self, input_zip, reserved_length):
+ """Finalizes a property-files string with actual METADATA offset/size info.
+
+ The input ZIP file has been signed, with the ZIP entries in the desired
+ place (signapk.jar will possibly reorder the ZIP entries). Now we compute
+ the ZIP entry offsets and construct the property-files string with actual
+ data. Note that during this process, we must pad the property-files string
+ to the reserved length, so that the METADATA entry size remains the same.
+ Otherwise the entries' offsets and sizes may change again.
+
+ Args:
+ input_zip: The input ZIP file.
+ reserved_length: The reserved length of the property-files string during
+ the call to Compute(). The final string must be no more than this
+ size.
+
+ Returns:
+ A property-files string including the metadata offset/size info, e.g.
+ "payload.bin:679:343,payload_properties.txt:378:45,metadata:69:379 ".
+
+ Raises:
+ InsufficientSpaceException: If the reserved length is insufficient to hold
+ the final string.
+ """
+ result = self.GetPropertyFilesString(input_zip, reserve_space=False)
+ if len(result) > reserved_length:
+ raise self.InsufficientSpaceException(
+ 'Insufficient reserved space: reserved={}, actual={}'.format(
+ reserved_length, len(result)))
+
+ result += ' ' * (reserved_length - len(result))
+ return result
+
+ def Verify(self, input_zip, expected):
+ """Verifies the input ZIP file contains the expected property-files string.
+
+ Args:
+ input_zip: The input ZIP file.
+ expected: The property-files string that's computed from Finalize().
+
+ Raises:
+ AssertionError: On finding a mismatch.
+ """
+ actual = self.GetPropertyFilesString(input_zip)
+ assert actual == expected, \
+ "Mismatching streaming metadata: {} vs {}.".format(actual, expected)
+
+ def GetPropertyFilesString(self, zip_file, reserve_space=False):
+ """
+ Constructs the property-files string per request.
+
+ Args:
+ zip_file: The input ZIP file.
+ reserved_length: The reserved length of the property-files string.
+
+ Returns:
+ A property-files string including the metadata offset/size info, e.g.
+ "payload.bin:679:343,payload_properties.txt:378:45,metadata: ".
+ """
+
+ def ComputeEntryOffsetSize(name):
+ """Computes the zip entry offset and size."""
+ info = zip_file.getinfo(name)
+ offset = info.header_offset
+ offset += zipfile.sizeFileHeader
+ offset += len(info.extra) + len(info.filename)
+ size = info.file_size
+ return '%s:%d:%d' % (os.path.basename(name), offset, size)
+
+ tokens = []
+ tokens.extend(self._GetPrecomputed(zip_file))
+ for entry in self.required:
+ tokens.append(ComputeEntryOffsetSize(entry))
+ for entry in self.optional:
+ if entry in zip_file.namelist():
+ tokens.append(ComputeEntryOffsetSize(entry))
+
+ # 'META-INF/com/android/metadata' is required. We don't know its actual
+ # offset and length (as well as the values for other entries). So we reserve
+ # 15-byte as a placeholder ('offset:length'), which is sufficient to cover
+ # the space for metadata entry. Because 'offset' allows a max of 10-digit
+ # (i.e. ~9 GiB), with a max of 4-digit for the length. Note that all the
+ # reserved space serves the metadata entry only.
+ if reserve_space:
+ tokens.append('metadata:' + ' ' * 15)
+ else:
+ tokens.append(ComputeEntryOffsetSize(METADATA_NAME))
+
+ return ','.join(tokens)
+
+ def _GetPrecomputed(self, input_zip):
+ """Computes the additional tokens to be included into the property-files.
+
+ This applies to tokens without actual ZIP entries, such as
+ payload_metadata.bin. We want to expose the offset/size to updaters, so
+ that they can download the payload metadata directly with the info.
+
+ Args:
+ input_zip: The input zip file.
+
+ Returns:
+ A list of strings (tokens) to be added to the property-files string.
+ """
+ # pylint: disable=no-self-use
+ # pylint: disable=unused-argument
+ return []
+
+
+def SignOutput(temp_zip_name, output_zip_name):
+ pw = OPTIONS.key_passwords[OPTIONS.package_key]
+
+ SignFile(temp_zip_name, output_zip_name, OPTIONS.package_key, pw,
+ whole_file=True)
diff --git a/tools/releasetools/test_non_ab_ota.py b/tools/releasetools/test_non_ab_ota.py
new file mode 100644
index 0000000000..ee1b4113c9
--- /dev/null
+++ b/tools/releasetools/test_non_ab_ota.py
@@ -0,0 +1,169 @@
+#
+# Copyright (C) 2020 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import copy
+import zipfile
+
+import common
+import test_utils
+
+from non_ab_ota import NonAbOtaPropertyFiles, WriteFingerprintAssertion
+from test_utils import PropertyFilesTestCase
+
+
+class NonAbOtaPropertyFilesTest(PropertyFilesTestCase):
+ """Additional validity checks specialized for NonAbOtaPropertyFiles."""
+ def setUp(self):
+ common.OPTIONS.no_signing = False
+ def test_init(self):
+ property_files = NonAbOtaPropertyFiles()
+ self.assertEqual('ota-property-files', property_files.name)
+ self.assertEqual((), property_files.required)
+ self.assertEqual((), property_files.optional)
+
+ def test_Compute(self):
+ entries = ()
+ zip_file = self.construct_zip_package(entries)
+ property_files = NonAbOtaPropertyFiles()
+ with zipfile.ZipFile(zip_file) as zip_fp:
+ property_files_string = property_files.Compute(zip_fp)
+
+ tokens = self._parse_property_files_string(property_files_string)
+ self.assertEqual(1, len(tokens))
+ self._verify_entries(zip_file, tokens, entries)
+
+ def test_Finalize(self):
+ entries = [
+ 'META-INF/com/android/metadata',
+ ]
+ zip_file = self.construct_zip_package(entries)
+ property_files = NonAbOtaPropertyFiles()
+ with zipfile.ZipFile(zip_file) as zip_fp:
+ raw_metadata = property_files.GetPropertyFilesString(
+ zip_fp, reserve_space=False)
+ property_files_string = property_files.Finalize(zip_fp, len(raw_metadata))
+ tokens = self._parse_property_files_string(property_files_string)
+
+ self.assertEqual(1, len(tokens))
+ # 'META-INF/com/android/metadata' will be key'd as 'metadata'.
+ entries[0] = 'metadata'
+ self._verify_entries(zip_file, tokens, entries)
+
+ def test_Verify(self):
+ entries = (
+ 'META-INF/com/android/metadata',
+ )
+ zip_file = self.construct_zip_package(entries)
+ property_files = NonAbOtaPropertyFiles()
+ with zipfile.ZipFile(zip_file) as zip_fp:
+ raw_metadata = property_files.GetPropertyFilesString(
+ zip_fp, reserve_space=False)
+
+ property_files.Verify(zip_fp, raw_metadata)
+
+class NonAbOTATest(test_utils.ReleaseToolsTestCase):
+ TEST_TARGET_INFO_DICT = {
+ 'build.prop': common.PartitionBuildProps.FromDictionary(
+ 'system', {
+ 'ro.product.device': 'product-device',
+ 'ro.build.fingerprint': 'build-fingerprint-target',
+ 'ro.build.version.incremental': 'build-version-incremental-target',
+ 'ro.build.version.sdk': '27',
+ 'ro.build.version.security_patch': '2017-12-01',
+ 'ro.build.date.utc': '1500000000'}
+ )
+ }
+ TEST_INFO_DICT_USES_OEM_PROPS = {
+ 'build.prop': common.PartitionBuildProps.FromDictionary(
+ 'system', {
+ 'ro.product.name': 'product-name',
+ 'ro.build.thumbprint': 'build-thumbprint',
+ 'ro.build.bar': 'build-bar'}
+ ),
+ 'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
+ 'vendor', {
+ 'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
+ ),
+ 'property1': 'value1',
+ 'property2': 4096,
+ 'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
+ }
+ TEST_OEM_DICTS = [
+ {
+ 'ro.product.brand': 'brand1',
+ 'ro.product.device': 'device1',
+ },
+ {
+ 'ro.product.brand': 'brand2',
+ 'ro.product.device': 'device2',
+ },
+ {
+ 'ro.product.brand': 'brand3',
+ 'ro.product.device': 'device3',
+ },
+ ]
+ def test_WriteFingerprintAssertion_without_oem_props(self):
+ target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
+ source_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
+ source_info_dict['build.prop'].build_props['ro.build.fingerprint'] = (
+ 'source-build-fingerprint')
+ source_info = common.BuildInfo(source_info_dict, None)
+
+ script_writer = test_utils.MockScriptWriter()
+ WriteFingerprintAssertion(script_writer, target_info, source_info)
+ self.assertEqual(
+ [('AssertSomeFingerprint', 'source-build-fingerprint',
+ 'build-fingerprint-target')],
+ script_writer.lines)
+
+ def test_WriteFingerprintAssertion_with_source_oem_props(self):
+ target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
+ source_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
+ self.TEST_OEM_DICTS)
+
+ script_writer = test_utils.MockScriptWriter()
+ WriteFingerprintAssertion(script_writer, target_info, source_info)
+ self.assertEqual(
+ [('AssertFingerprintOrThumbprint', 'build-fingerprint-target',
+ 'build-thumbprint')],
+ script_writer.lines)
+
+ def test_WriteFingerprintAssertion_with_target_oem_props(self):
+ target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
+ self.TEST_OEM_DICTS)
+ source_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
+
+ script_writer = test_utils.MockScriptWriter()
+ WriteFingerprintAssertion(script_writer, target_info, source_info)
+ self.assertEqual(
+ [('AssertFingerprintOrThumbprint', 'build-fingerprint-target',
+ 'build-thumbprint')],
+ script_writer.lines)
+
+ def test_WriteFingerprintAssertion_with_both_oem_props(self):
+ target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
+ self.TEST_OEM_DICTS)
+ source_info_dict = copy.deepcopy(self.TEST_INFO_DICT_USES_OEM_PROPS)
+ source_info_dict['build.prop'].build_props['ro.build.thumbprint'] = (
+ 'source-build-thumbprint')
+ source_info = common.BuildInfo(source_info_dict, self.TEST_OEM_DICTS)
+
+ script_writer = test_utils.MockScriptWriter()
+ WriteFingerprintAssertion(script_writer, target_info, source_info)
+ self.assertEqual(
+ [('AssertSomeThumbprint', 'build-thumbprint',
+ 'source-build-thumbprint')],
+ script_writer.lines)
diff --git a/tools/releasetools/test_ota_from_target_files.py b/tools/releasetools/test_ota_from_target_files.py
index 07b2e05e4c..52aa487031 100644
--- a/tools/releasetools/test_ota_from_target_files.py
+++ b/tools/releasetools/test_ota_from_target_files.py
@@ -21,14 +21,15 @@ import zipfile
import common
import test_utils
+from ota_utils import CalculateRuntimeDevicesAndFingerprints
from ota_from_target_files import (
_LoadOemDicts, AbOtaPropertyFiles, FinalizeMetadata,
GetPackageMetadata, GetTargetFilesZipForSecondaryImages,
- GetTargetFilesZipWithoutPostinstallConfig, NonAbOtaPropertyFiles,
+ GetTargetFilesZipWithoutPostinstallConfig,
Payload, PayloadSigner, POSTINSTALL_CONFIG, PropertyFiles,
- StreamingPropertyFiles, WriteFingerprintAssertion,
- CalculateRuntimeDevicesAndFingerprints)
-
+ StreamingPropertyFiles)
+from non_ab_ota import NonAbOtaPropertyFiles
+from test_utils import PropertyFilesTestCase
def construct_target_files(secondary=False):
"""Returns a target-files.zip file for generating OTA packages."""
@@ -149,20 +150,6 @@ class OtaFromTargetFilesTest(test_utils.ReleaseToolsTestCase):
'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
}
- TEST_OEM_DICTS = [
- {
- 'ro.product.brand': 'brand1',
- 'ro.product.device': 'device1',
- },
- {
- 'ro.product.brand': 'brand2',
- 'ro.product.device': 'device2',
- },
- {
- 'ro.product.brand': 'brand3',
- 'ro.product.device': 'device3',
- },
- ]
def setUp(self):
self.testdata_dir = test_utils.get_testdata_dir()
@@ -529,59 +516,6 @@ class OtaFromTargetFilesTest(test_utils.ReleaseToolsTestCase):
FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
self.assertIn('ota-test-property-files', metadata)
- def test_WriteFingerprintAssertion_without_oem_props(self):
- target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
- source_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
- source_info_dict['build.prop'].build_props['ro.build.fingerprint'] = (
- 'source-build-fingerprint')
- source_info = common.BuildInfo(source_info_dict, None)
-
- script_writer = test_utils.MockScriptWriter()
- WriteFingerprintAssertion(script_writer, target_info, source_info)
- self.assertEqual(
- [('AssertSomeFingerprint', 'source-build-fingerprint',
- 'build-fingerprint-target')],
- script_writer.lines)
-
- def test_WriteFingerprintAssertion_with_source_oem_props(self):
- target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
- source_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
- self.TEST_OEM_DICTS)
-
- script_writer = test_utils.MockScriptWriter()
- WriteFingerprintAssertion(script_writer, target_info, source_info)
- self.assertEqual(
- [('AssertFingerprintOrThumbprint', 'build-fingerprint-target',
- 'build-thumbprint')],
- script_writer.lines)
-
- def test_WriteFingerprintAssertion_with_target_oem_props(self):
- target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
- self.TEST_OEM_DICTS)
- source_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
-
- script_writer = test_utils.MockScriptWriter()
- WriteFingerprintAssertion(script_writer, target_info, source_info)
- self.assertEqual(
- [('AssertFingerprintOrThumbprint', 'build-fingerprint-target',
- 'build-thumbprint')],
- script_writer.lines)
-
- def test_WriteFingerprintAssertion_with_both_oem_props(self):
- target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
- self.TEST_OEM_DICTS)
- source_info_dict = copy.deepcopy(self.TEST_INFO_DICT_USES_OEM_PROPS)
- source_info_dict['build.prop'].build_props['ro.build.thumbprint'] = (
- 'source-build-thumbprint')
- source_info = common.BuildInfo(source_info_dict, self.TEST_OEM_DICTS)
-
- script_writer = test_utils.MockScriptWriter()
- WriteFingerprintAssertion(script_writer, target_info, source_info)
- self.assertEqual(
- [('AssertSomeThumbprint', 'build-thumbprint',
- 'source-build-thumbprint')],
- script_writer.lines)
-
class TestPropertyFiles(PropertyFiles):
"""A class that extends PropertyFiles for testing purpose."""
@@ -598,41 +532,8 @@ class TestPropertyFiles(PropertyFiles):
'optional-entry2',
)
+class PropertyFilesTest(PropertyFilesTestCase):
-class PropertyFilesTest(test_utils.ReleaseToolsTestCase):
-
- def setUp(self):
- common.OPTIONS.no_signing = False
-
- @staticmethod
- def construct_zip_package(entries):
- zip_file = common.MakeTempFile(suffix='.zip')
- with zipfile.ZipFile(zip_file, 'w') as zip_fp:
- for entry in entries:
- zip_fp.writestr(
- entry,
- entry.replace('.', '-').upper(),
- zipfile.ZIP_STORED)
- return zip_file
-
- @staticmethod
- def _parse_property_files_string(data):
- result = {}
- for token in data.split(','):
- name, info = token.split(':', 1)
- result[name] = info
- return result
-
- def _verify_entries(self, input_file, tokens, entries):
- for entry in entries:
- offset, size = map(int, tokens[entry].split(':'))
- with open(input_file, 'rb') as input_fp:
- input_fp.seek(offset)
- if entry == 'metadata':
- expected = b'META-INF/COM/ANDROID/METADATA'
- else:
- expected = entry.replace('.', '-').upper().encode()
- self.assertEqual(expected, input_fp.read(size))
@test_utils.SkipIfExternalToolsUnavailable()
def test_Compute(self):
@@ -753,7 +654,7 @@ class PropertyFilesTest(test_utils.ReleaseToolsTestCase):
AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
-class StreamingPropertyFilesTest(PropertyFilesTest):
+class StreamingPropertyFilesTest(PropertyFilesTestCase):
"""Additional validity checks specialized for StreamingPropertyFiles."""
def test_init(self):
@@ -834,7 +735,7 @@ class StreamingPropertyFilesTest(PropertyFilesTest):
AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
-class AbOtaPropertyFilesTest(PropertyFilesTest):
+class AbOtaPropertyFilesTest(PropertyFilesTestCase):
"""Additional validity checks specialized for AbOtaPropertyFiles."""
# The size for payload and metadata signature size.
@@ -1002,56 +903,6 @@ class AbOtaPropertyFilesTest(PropertyFilesTest):
property_files.Verify(zip_fp, raw_metadata)
-class NonAbOtaPropertyFilesTest(PropertyFilesTest):
- """Additional validity checks specialized for NonAbOtaPropertyFiles."""
-
- def test_init(self):
- property_files = NonAbOtaPropertyFiles()
- self.assertEqual('ota-property-files', property_files.name)
- self.assertEqual((), property_files.required)
- self.assertEqual((), property_files.optional)
-
- def test_Compute(self):
- entries = ()
- zip_file = self.construct_zip_package(entries)
- property_files = NonAbOtaPropertyFiles()
- with zipfile.ZipFile(zip_file) as zip_fp:
- property_files_string = property_files.Compute(zip_fp)
-
- tokens = self._parse_property_files_string(property_files_string)
- self.assertEqual(1, len(tokens))
- self._verify_entries(zip_file, tokens, entries)
-
- def test_Finalize(self):
- entries = [
- 'META-INF/com/android/metadata',
- ]
- zip_file = self.construct_zip_package(entries)
- property_files = NonAbOtaPropertyFiles()
- with zipfile.ZipFile(zip_file) as zip_fp:
- raw_metadata = property_files.GetPropertyFilesString(
- zip_fp, reserve_space=False)
- property_files_string = property_files.Finalize(zip_fp, len(raw_metadata))
- tokens = self._parse_property_files_string(property_files_string)
-
- self.assertEqual(1, len(tokens))
- # 'META-INF/com/android/metadata' will be key'd as 'metadata'.
- entries[0] = 'metadata'
- self._verify_entries(zip_file, tokens, entries)
-
- def test_Verify(self):
- entries = (
- 'META-INF/com/android/metadata',
- )
- zip_file = self.construct_zip_package(entries)
- property_files = NonAbOtaPropertyFiles()
- with zipfile.ZipFile(zip_file) as zip_fp:
- raw_metadata = property_files.GetPropertyFilesString(
- zip_fp, reserve_space=False)
-
- property_files.Verify(zip_fp, raw_metadata)
-
-
class PayloadSignerTest(test_utils.ReleaseToolsTestCase):
SIGFILE = 'sigfile.bin'
diff --git a/tools/releasetools/test_utils.py b/tools/releasetools/test_utils.py
index e99975765b..65092d84de 100755
--- a/tools/releasetools/test_utils.py
+++ b/tools/releasetools/test_utils.py
@@ -25,6 +25,7 @@ import os.path
import struct
import sys
import unittest
+import zipfile
import common
@@ -192,6 +193,41 @@ class ReleaseToolsTestCase(unittest.TestCase):
def tearDown(self):
common.Cleanup()
+class PropertyFilesTestCase(ReleaseToolsTestCase):
+
+ @staticmethod
+ def construct_zip_package(entries):
+ zip_file = common.MakeTempFile(suffix='.zip')
+ with zipfile.ZipFile(zip_file, 'w') as zip_fp:
+ for entry in entries:
+ zip_fp.writestr(
+ entry,
+ entry.replace('.', '-').upper(),
+ zipfile.ZIP_STORED)
+ return zip_file
+
+ @staticmethod
+ def _parse_property_files_string(data):
+ result = {}
+ for token in data.split(','):
+ name, info = token.split(':', 1)
+ result[name] = info
+ return result
+
+ def setUp(self):
+ common.OPTIONS.no_signing = False
+
+ def _verify_entries(self, input_file, tokens, entries):
+ for entry in entries:
+ offset, size = map(int, tokens[entry].split(':'))
+ with open(input_file, 'rb') as input_fp:
+ input_fp.seek(offset)
+ if entry == 'metadata':
+ expected = b'META-INF/COM/ANDROID/METADATA'
+ else:
+ expected = entry.replace('.', '-').upper().encode()
+ self.assertEqual(expected, input_fp.read(size))
+
if __name__ == '__main__':
testsuite = unittest.TestLoader().discover(