Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new recipe to for Edgelock2GO auto registration #750

Merged
merged 1 commit into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
SUMMARY = "A systemd oneshot helper to auto register a device using EdgeLock2GO"
LICENSE = "MIT"
LIC_FILES_CHKSUM = "file://${COMMON_LICENSE_DIR}/MIT;md5=0835ade698e0bcf8506ecda2f7b4f302"

RDEPENDS:${PN} += "plug-and-trust-seteec python3-core python3-plug-and-trust-ssscli opensc"

SRC_URI = " \
file://lmp-el2go-auto-register.service \
file://lmp-el2go-auto-register \
file://default.env \
file://root.crt \
"

inherit systemd

SYSTEMD_SERVICE:${PN} = "lmp-el2go-auto-register.service"

do_install() {
install -d ${D}${systemd_system_unitdir}
install -m 0644 ${WORKDIR}/lmp-el2go-auto-register.service ${D}${systemd_system_unitdir}
install -d ${D}${bindir}
install -m 0755 ${WORKDIR}/lmp-el2go-auto-register ${D}${bindir}
install -d ${D}${sysconfdir}/default
install -m 0644 ${WORKDIR}/default.env ${D}${sysconfdir}/default/lmp-el2go-auto-register
install -d ${D}${datadir}/lmp-el2go-auto-register
install -m 0644 ${WORKDIR}/root.crt ${D}${datadir}/lmp-el2go-auto-register
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
#!/usr/bin/python3
# SPDX-License-Identifier: MIT
#
# Copyright (c) 2022, Foundries.io Ltd.

from contextlib import contextmanager
import os
import logging
import subprocess
import sys
from time import sleep
from tempfile import NamedTemporaryFile
from typing import List, NamedTuple


DAEMON_INTERVAL = os.environ.get("DAEMON_INTERVAL", "300")
PIN = os.environ.get("PKCS11_PIN", "87654321")
SO_PIN = os.environ.get("PKCS11_SOPIN", "12345678")
SOTA_DIR = os.environ.get("SOTA_DIR", "/var/sota")
HANDLERS = os.environ.get("HANDLERS", "aws-iot,aktualizr-lite")

REPO_ID = os.environ["REPOID"]

logging.basicConfig(level="INFO", format="%(asctime)s %(levelname)s: %(message)s")
log = logging.getLogger()


def run_quietly(args: List[str]):
try:
subprocess.check_output(args, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
log.error("Unable to run: %s", args)
print("| " + e.output.decode().replace("\n", "\n| "), file=sys.stderr)
sys.exit(1)


class Pkcs11:
LIB = "/usr/lib/libckteec.so.0.1"

def gen_kp(self, slot: str, label: str):
args = [
"pkcs11-tool",
f"--module={self.LIB}",
f"--pin={PIN}",
"--keypairgen",
"--key-type=EC:prime256v1",
f"--id={slot}",
f"--label={label}",
"--token-label=aktualizr",
]
run_quietly(args)

def write_object(self, type: str, objfile: str, slot: str, label: str):
args = [
"pkcs11-tool",
f"--module={self.LIB}",
f"--pin={PIN}",
"--login",
f"--write-object={objfile}",
f"--type={type}",
f"--id={slot}",
f"--label={label}",
]
run_quietly(args)

def has_labels(self, labels: List[str]) -> bool:
args = ["pkcs11-tool", f"--module={self.LIB}", f"--pin={PIN}", "--list-objects"]
out = subprocess.check_output(args)
missing = {x: 1 for x in labels}
for line in out.decode().splitlines():
for l in labels:
if line.find(l) != -1:
try:
del missing[l]
except KeyError:
pass # already removed
return len(missing) == 0

@classmethod
def _is_initialized(cls) -> bool:
args = ["pkcs11-tool", f"--module={cls.LIB}", f"--pin={PIN}", "--list-objects"]
try:
subprocess.check_output(args, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return False
return True

@classmethod
def _initialize(cls):
args = ["pkcs11-tool", f"--module={cls.LIB}", f"--so-pin={SO_PIN}"]
subprocess.check_call(args + ["--init-token", "--label", "aktualizr"])
subprocess.check_call(args + ["--init-pin", f"--pin={PIN}"])

@classmethod
def get_initialized(cls) -> "Pkcs11":
if cls._is_initialized():
log.info("Pkcs11 slots already initialized")
else:
log.info("Initializing pkcs11 slots...")
cls._initialize()
return cls()


class SssCli:
@contextmanager
def i2c_session(*args, **kwds):
log.info("Connecting to ssscli...")
subprocess.check_call(["ssscli", "connect", "se05x", "t1oi2c", "none"])
try:
yield
finally:
log.info("Disconnecting from ssscli")
subprocess.check_call(["ssscli", "disconnect"])

@classmethod
def read_id_list(cls) -> List[str]:
with cls.i2c_session():
out = subprocess.check_output(["ssscli", "se05x", "readidlist"])
lines = out.decode().splitlines()
# lines are like: Key-Id: 0Xf0000003 BINARY .....
return [x.split(" ")[1] for x in lines if x]

@classmethod
def get_cert(cls, keyid: str, certfile: str):
with cls.i2c_session():
subprocess.check_call(["ssscli", "get", "cert", keyid, certfile])


def run_agent() -> bool:
log.info("Running EdgeLock2GO agent...")
try:
subprocess.check_output(["nxp_iot_agent_demo"], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
print("| " + e.output.decode().replace("\n", "\n| "), file=sys.stderr)
return False
log.info("EdgeLock2GO agent check-in complete")
return True


class AwsKpHandler(NamedTuple):
key_id: str = "0X83000044"
crt_id: str = "0X83000045"

def process(self, pkcs11: Pkcs11):
key_label = self.key_id.replace("0X", "SE_")
crt_label = self.crt_id.replace("0X", "SE_")
if pkcs11.has_labels([key_label, crt_label]):
log.info("aws-iot already provisioned")
return
log.info("Processing keys for aws-iot...")
pkcs11.gen_kp("04", key_label)
with NamedTemporaryFile() as f:
SssCli.get_cert(self.crt_id, f.name)
pkcs11.write_object("cert", f.name, "05", crt_label)


class AkliteKpHandler(NamedTuple):
key_id: str = "0X83000042"
crt_id: str = "0X83000043"

@staticmethod
def _get_tag():
tag = ""
with open("/etc/os-release") as f:
for line in f:
key, val = line.split("=", 1)
if key == "LMP_FACTORY_TAG":
return val.strip().split('"')[1]
sys.exit("Unable to parse LMP_MACHINE and LMP_TAG from /etc/os-release")

def process(self, pkcs11: Pkcs11):
key_label = self.key_id.replace("0X", "SE_")
crt_label = self.crt_id.replace("0X", "SE_")

if pkcs11.has_labels([key_label, crt_label]):
if os.path.exists(os.path.join(SOTA_DIR, "sql.db")):
log.info("Aktualizr-lite already provisioned")
return
else:
log.info("Aktualizr-lite keys in place")
else:
log.info("Processing keys for aktualizr-lite...")
pkcs11.gen_kp("01", key_label)
with NamedTemporaryFile() as f:
SssCli.get_cert(self.crt_id, f.name)
pkcs11.write_object("cert", f.name, "03", crt_label)

tag = self._get_tag()
sota_toml = os.path.join(SOTA_DIR, "sota.toml")
log.info("Provisioning %s", sota_toml)
toml = f"""
[tls]
server = "https://{REPO_ID}.ota-lite.foundries.io:8443"
ca_source = "file"
pkey_source = "pkcs11"
cert_source = "pkcs11"

[provision]
server = "https://{REPO_ID}.ota-lite.foundries.io:8443"

[uptane]
repo_server = "https://{REPO_ID}.ota-lite.foundries.io:8443/repo"
key_source = "file"

[pacman]
type = "ostree+compose_apps"
ostree_server = "https://{REPO_ID}.ostree.foundries.io:8443/ostree"
packages_file = "/usr/package.manifest"
tags = "{tag}"
compose_apps_root = "{SOTA_DIR}/compose-apps"

[storage]
type = "sqlite"
path = "{SOTA_DIR}/"

[import]
tls_cacert_path = "/usr/share/lmp-el2go-auto-register/root.crt"

[p11]
module = "{Pkcs11.LIB}"
pass = "{PIN}"
tls_pkey_id = "01"
tls_clientcert_id = "03"
"""
with open(sota_toml, "w") as f:
f.write(toml)
log.info("Starting aktualizr-lite")
subprocess.check_call(["systemctl", "start", "aktualizr-lite"])


def main():
handlers = {
"aws-iot": AwsKpHandler(),
"aktualizr-lite": AkliteKpHandler(),
}
handler_names = [x.strip() for x in HANDLERS.split(",") if x]

interval = int(DAEMON_INTERVAL)
pkcs11 = Pkcs11.get_initialized()

while True:
if run_agent():
ids = SssCli.read_id_list()
for name in handler_names:
handler = handlers[name]
if handler.key_id in ids and handler.crt_id in ids:
handler.process(pkcs11)
break
else:
log.info(
"EdgeLock2GO cloud service reported an error, retrying in %d seconds",
interval,
)
sleep(interval)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[Unit]
Description=Script to auto-register device into Factory via EdgeLock2GO
Wants=network-online.target time-sync.target systemd-time-wait-sync.service
After=network-online.target time-sync.target systemd-time-wait-sync.service
ConditionPathExists=!/var/sota/sql.db

[Service]
EnvironmentFile=-/etc/default/lmp-el2go-auto-register
ExecStart=/usr/bin/lmp-el2go-auto-register
WorkingDirectory=/run

[Install]
WantedBy=multi-user.target