Skip to content

Commit

Permalink
Add new recipe to for Edgelock2GO auto registration
Browse files Browse the repository at this point in the history
This recipe requires a factory with Edgelock2GO enabled and
meta-subscriber-overrides additions for default.env and root.crt

Signed-off-by: Andy Doan <[email protected]>
  • Loading branch information
doanac committed Jul 21, 2022
1 parent 5cc7c93 commit da608d9
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
SUMMARY = "A systemd oneshot helper to auto register a device using EdgeLock2GO"
LICENSE = "MIT"
LIC_FILES_CHKSUM = "file://${COMMON_LICENSE_DIR}/MIT;md5=0835ade698e0bcf8506ecda2f7b4f302"

RDEPENDS:${PN} += "plug-and-trust-seteec python3-core python3-plug-and-trust-ssscli opensc"

SRC_URI = " \
file://lmp-el2go-auto-register.service \
file://lmp-el2go-auto-register \
file://default.env \
file://root.crt \
"

inherit systemd

SYSTEMD_SERVICE:${PN} = "lmp-el2go-auto-register.service"

do_install() {
install -d ${D}${systemd_system_unitdir}
install -m 0644 ${WORKDIR}/lmp-el2go-auto-register.service ${D}${systemd_system_unitdir}
install -d ${D}${bindir}
install -m 0755 ${WORKDIR}/lmp-el2go-auto-register ${D}${bindir}
install -d ${D}${sysconfdir}/default
install -m 0644 ${WORKDIR}/default.env ${D}${sysconfdir}/default/lmp-el2go-auto-register
install -d ${D}${datadir}/lmp-el2go-auto-register
install -m 0644 ${WORKDIR}/root.crt ${D}${datadir}/lmp-el2go-auto-register
}
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
#!/usr/bin/python3
# SPDX-License-Identifier: MIT
#
# Copyright (c) 2022, Foundries.io Ltd.

from contextlib import contextmanager
import os
import logging
import subprocess
import sys
from time import sleep
from tempfile import NamedTemporaryFile
from typing import List, NamedTuple


DAEMON_INTERVAL = os.environ.get("DAEMON_INTERVAL", "300")
PIN = os.environ.get("PKCS11_PIN", "87654321")
SO_PIN = os.environ.get("PKCS11_SOPIN", "12345678")
SOTA_DIR = os.environ.get("SOTA_DIR", "/var/sota")
HANDLERS = os.environ.get("HANDLERS", "aws-iot,aktualizr-lite")

REPO_ID = os.environ["REPOID"]

logging.basicConfig(level="INFO", format="%(asctime)s %(levelname)s: %(message)s")
log = logging.getLogger()


def run_quietly(args: List[str]):
try:
subprocess.check_output(args, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
log.error("Unable to run: %s", args)
print("| " + e.output.decode().replace("\n", "\n| "), file=sys.stderr)
sys.exit(1)


class Pkcs11:
LIB = "/usr/lib/libckteec.so.0.1"

def gen_kp(self, slot: str, label: str):
args = [
"pkcs11-tool",
f"--module={self.LIB}",
f"--pin={PIN}",
"--keypairgen",
"--key-type=EC:prime256v1",
f"--id={slot}",
f"--label={label}",
"--token-label=aktualizr",
]
run_quietly(args)

def write_object(self, type: str, objfile: str, slot: str, label: str):
args = [
"pkcs11-tool",
f"--module={self.LIB}",
f"--pin={PIN}",
"--login",
f"--write-object={objfile}",
f"--type={type}",
f"--id={slot}",
f"--label={label}",
]
run_quietly(args)

def has_labels(self, labels: List[str]) -> bool:
args = ["pkcs11-tool", f"--module={self.LIB}", f"--pin={PIN}", "--list-objects"]
out = subprocess.check_output(args)
missing = {x: 1 for x in labels}
for line in out.decode().splitlines():
for l in labels:
if line.find(l) != -1:
try:
del missing[l]
except KeyError:
pass # already removed
return len(missing) == 0

@classmethod
def _is_initialized(cls) -> bool:
args = ["pkcs11-tool", f"--module={cls.LIB}", f"--pin={PIN}", "--list-objects"]
try:
subprocess.check_output(args, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return False
return True

@classmethod
def _initialize(cls):
args = ["pkcs11-tool", f"--module={cls.LIB}", f"--so-pin={SO_PIN}"]
subprocess.check_call(args + ["--init-token", "--label", "aktualizr"])
subprocess.check_call(args + ["--init-pin", f"--pin={PIN}"])

@classmethod
def get_initialized(cls) -> "Pkcs11":
if cls._is_initialized():
log.info("Pkcs11 slots already initialized")
else:
log.info("Initializing pkcs11 slots...")
cls._initialize()
return cls()


class SssCli:
@contextmanager
def i2c_session(*args, **kwds):
log.info("Connecting to ssscli...")
subprocess.check_call(["ssscli", "connect", "se05x", "t1oi2c", "none"])
try:
yield
finally:
log.info("Disconnecting from ssscli")
subprocess.check_call(["ssscli", "disconnect"])

@classmethod
def read_id_list(cls) -> List[str]:
with cls.i2c_session():
out = subprocess.check_output(["ssscli", "se05x", "readidlist"])
lines = out.decode().splitlines()
# lines are like: Key-Id: 0Xf0000003 BINARY .....
return [x.split(" ")[1] for x in lines if x]

@classmethod
def get_cert(cls, keyid: str, certfile: str):
with cls.i2c_session():
subprocess.check_call(["ssscli", "get", "cert", keyid, certfile])


def run_agent() -> bool:
log.info("Running EdgeLock2GO agent...")
try:
subprocess.check_output(["nxp_iot_agent_demo"], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
print("| " + e.output.decode().replace("\n", "\n| "), file=sys.stderr)
return False
log.info("EdgeLock2GO agent check-in complete")
return True


class AwsKpHandler(NamedTuple):
key_id: str = "0X83000044"
crt_id: str = "0X83000045"

def process(self, pkcs11: Pkcs11):
key_label = self.key_id.replace("0X", "SE_")
crt_label = self.crt_id.replace("0X", "SE_")
if pkcs11.has_labels([key_label, crt_label]):
log.info("aws-iot already provisioned")
return
log.info("Processing keys for aws-iot...")
pkcs11.gen_kp("04", key_label)
with NamedTemporaryFile() as f:
SssCli.get_cert(self.crt_id, f.name)
pkcs11.write_object("cert", f.name, "05", crt_label)


class AkliteKpHandler(NamedTuple):
key_id: str = "0X83000042"
crt_id: str = "0X83000043"

@staticmethod
def _get_tag():
tag = ""
with open("/etc/os-release") as f:
for line in f:
key, val = line.split("=", 1)
if key == "LMP_FACTORY_TAG":
return val.strip().split('"')[1]
sys.exit("Unable to parse LMP_MACHINE and LMP_TAG from /etc/os-release")

def process(self, pkcs11: Pkcs11):
key_label = self.key_id.replace("0X", "SE_")
crt_label = self.crt_id.replace("0X", "SE_")

if pkcs11.has_labels([key_label, crt_label]):
if os.path.exists(os.path.join(SOTA_DIR, "sql.db")):
log.info("Aktualizr-lite already provisioned")
return
else:
log.info("Aktualizr-lite keys in place")
else:
log.info("Processing keys for aktualizr-lite...")
pkcs11.gen_kp("01", key_label)
with NamedTemporaryFile() as f:
SssCli.get_cert(self.crt_id, f.name)
pkcs11.write_object("cert", f.name, "03", crt_label)

tag = self._get_tag()
sota_toml = os.path.join(SOTA_DIR, "sota.toml")
log.info("Provisioning %s", sota_toml)
toml = f"""
[tls]
server = "https://{REPO_ID}.ota-lite.foundries.io:8443"
ca_source = "file"
pkey_source = "pkcs11"
cert_source = "pkcs11"
[provision]
server = "https://{REPO_ID}.ota-lite.foundries.io:8443"
[uptane]
repo_server = "https://{REPO_ID}.ota-lite.foundries.io:8443/repo"
key_source = "file"
[pacman]
type = "ostree+compose_apps"
ostree_server = "https://{REPO_ID}.ostree.foundries.io:8443/ostree"
packages_file = "/usr/package.manifest"
tags = "{tag}"
compose_apps_root = "{SOTA_DIR}/compose-apps"
[storage]
type = "sqlite"
path = "{SOTA_DIR}/"
[import]
tls_cacert_path = "/usr/share/lmp-el2go-auto-register/root.crt"
[p11]
module = "{Pkcs11.LIB}"
pass = "{PIN}"
tls_pkey_id = "01"
tls_clientcert_id = "03"
"""
with open(sota_toml, "w") as f:
f.write(toml)
log.info("Starting aktualizr-lite")
subprocess.check_call(["systemctl", "start", "aktualizr-lite"])


def main():
handlers = {
"aws-iot": AwsKpHandler(),
"aktualizr-lite": AkliteKpHandler(),
}
handler_names = [x.strip() for x in HANDLERS.split(",") if x]

interval = int(DAEMON_INTERVAL)
pkcs11 = Pkcs11.get_initialized()

while True:
if run_agent():
ids = SssCli.read_id_list()
for name in handler_names:
handler = handlers[name]
if handler.key_id in ids and handler.crt_id in ids:
handler.process(pkcs11)
break
else:
log.info(
"EdgeLock2GO cloud service reported an error, retrying in %d seconds",
interval,
)
sleep(interval)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[Unit]
Description=Script to auto-register device into Factory via EdgeLock2GO
Wants=network-online.target time-sync.target systemd-time-wait-sync.service
After=network-online.target time-sync.target systemd-time-wait-sync.service
ConditionPathExists=!/var/sota/sql.db

[Service]
EnvironmentFile=-/etc/default/lmp-el2go-auto-register
ExecStart=/usr/bin/lmp-el2go-auto-register
WorkingDirectory=/run

[Install]
WantedBy=multi-user.target
Empty file.

0 comments on commit da608d9

Please sign in to comment.