diff options
author | Ben Luck <ben+qa@advancedtelematic.com> | 2018-02-22 12:24:29 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-22 12:24:29 +0100 |
commit | 6d2e9d364466fc554a3e4af8d7833734bac24a2e (patch) | |
tree | 55b2f00c1fb1f1ac089300b53bb9d7b61a2154f9 /lib | |
parent | 1f6b311c9fc567d7038988cc60be148c3181c111 (diff) | |
parent | 2023bfbbe31d5bb2df6da2af66ff828b2321b1e4 (diff) | |
download | meta-updater-6d2e9d364466fc554a3e4af8d7833734bac24a2e.tar.gz |
Merge pull request #260 from advancedtelematic/test/PRO-4481/hsm
Test/pro 4481/hsm
Diffstat (limited to 'lib')
-rw-r--r-- | lib/oeqa/selftest/updater.py | 299 |
1 files changed, 245 insertions, 54 deletions
diff --git a/lib/oeqa/selftest/updater.py b/lib/oeqa/selftest/updater.py index 0962cb7..1efbba9 100644 --- a/lib/oeqa/selftest/updater.py +++ b/lib/oeqa/selftest/updater.py | |||
@@ -1,6 +1,7 @@ | |||
1 | # pylint: disable=C0111,C0325 | 1 | # pylint: disable=C0111,C0325 |
2 | import os | 2 | import os |
3 | import logging | 3 | import logging |
4 | import re | ||
4 | import subprocess | 5 | import subprocess |
5 | import unittest | 6 | import unittest |
6 | from time import sleep | 7 | from time import sleep |
@@ -15,35 +16,32 @@ class SotaToolsTests(oeSelfTest): | |||
15 | @classmethod | 16 | @classmethod |
16 | def setUpClass(cls): | 17 | def setUpClass(cls): |
17 | logger = logging.getLogger("selftest") | 18 | logger = logging.getLogger("selftest") |
19 | bb_vars = get_bb_vars(['SYSROOT_DESTDIR', 'base_prefix', 'libdir', 'bindir'], | ||
20 | 'aktualizr-native') | ||
21 | cls.sysroot = bb_vars['SYSROOT_DESTDIR'] + bb_vars['base_prefix'] | ||
22 | cls.sysrootbin = bb_vars['SYSROOT_DESTDIR'] + bb_vars['bindir'] | ||
23 | cls.libdir = bb_vars['libdir'] | ||
24 | |||
18 | logger.info('Running bitbake to build aktualizr-native tools') | 25 | logger.info('Running bitbake to build aktualizr-native tools') |
19 | bitbake('aktualizr-native') | 26 | bitbake('aktualizr-native') |
20 | 27 | ||
21 | def test_push_help(self): | 28 | def runNativeCmd(self, cmd, **kwargs): |
22 | bb_vars = get_bb_vars(['SYSROOT_DESTDIR', 'bindir'], 'aktualizr-native') | 29 | program, *_ = cmd.split(' ') |
23 | p = bb_vars['SYSROOT_DESTDIR'] + bb_vars['bindir'] + "/" + "garage-push" | 30 | p = '{}/{}'.format(self.sysrootbin, program) |
24 | self.assertTrue(os.path.isfile(p), msg = "No garage-push found (%s)" % p) | 31 | self.assertTrue(os.path.isfile(p), msg="No {} found ({})".format(program, p)) |
25 | result = runCmd('%s --help' % p, ignore_status=True) | 32 | env = dict(os.environ) |
33 | env['LD_LIBRARY_PATH'] = self.libdir | ||
34 | result = runCmd(cmd, env=env, native_sysroot=self.sysroot, ignore_status=True, **kwargs) | ||
26 | self.assertEqual(result.status, 0, "Status not equal to 0. output: %s" % result.output) | 35 | self.assertEqual(result.status, 0, "Status not equal to 0. output: %s" % result.output) |
27 | 36 | ||
37 | def test_push_help(self): | ||
38 | self.runNativeCmd('garage-push --help') | ||
39 | |||
28 | def test_deploy_help(self): | 40 | def test_deploy_help(self): |
29 | bb_vars = get_bb_vars(['SYSROOT_DESTDIR', 'bindir'], 'aktualizr-native') | 41 | self.runNativeCmd('garage-deploy --help') |
30 | p = bb_vars['SYSROOT_DESTDIR'] + bb_vars['bindir'] + "/" + "garage-deploy" | ||
31 | self.assertTrue(os.path.isfile(p), msg = "No garage-deploy found (%s)" % p) | ||
32 | result = runCmd('%s --help' % p, ignore_status=True) | ||
33 | self.assertEqual(result.status, 0, "Status not equal to 0. output: %s" % result.output) | ||
34 | 42 | ||
35 | def test_garagesign_help(self): | 43 | def test_garagesign_help(self): |
36 | bb_vars = get_bb_vars(['SYSROOT_DESTDIR', 'bindir'], 'aktualizr-native') | 44 | self.runNativeCmd('garage-sign --help') |
37 | p = bb_vars['SYSROOT_DESTDIR'] + bb_vars['bindir'] + "/" + "garage-sign" | ||
38 | self.assertTrue(os.path.isfile(p), msg = "No garage-sign found (%s)" % p) | ||
39 | result = runCmd('%s --help' % p, ignore_status=True) | ||
40 | self.assertEqual(result.status, 0, "Status not equal to 0. output: %s" % result.output) | ||
41 | |||
42 | class HsmTests(oeSelfTest): | ||
43 | |||
44 | def test_hsm(self): | ||
45 | self.write_config('SOTA_CLIENT_FEATURES="hsm"') | ||
46 | bitbake('core-image-minimal') | ||
47 | 45 | ||
48 | 46 | ||
49 | class GeneralTests(oeSelfTest): | 47 | class GeneralTests(oeSelfTest): |
@@ -57,6 +55,9 @@ class GeneralTests(oeSelfTest): | |||
57 | self.assertNotEqual(result, -1, 'Feature "systemd" not set at DISTRO_FEATURES') | 55 | self.assertNotEqual(result, -1, 'Feature "systemd" not set at DISTRO_FEATURES') |
58 | 56 | ||
59 | def test_credentials(self): | 57 | def test_credentials(self): |
58 | logger = logging.getLogger("selftest") | ||
59 | logger.info('Running bitbake to build core-image-minimal') | ||
60 | self.append_config('SOTA_CLIENT_PROV = "aktualizr-auto-prov"') | ||
60 | bitbake('core-image-minimal') | 61 | bitbake('core-image-minimal') |
61 | credentials = get_bb_var('SOTA_PACKED_CREDENTIALS') | 62 | credentials = get_bb_var('SOTA_PACKED_CREDENTIALS') |
62 | # skip the test if the variable SOTA_PACKED_CREDENTIALS is not set | 63 | # skip the test if the variable SOTA_PACKED_CREDENTIALS is not set |
@@ -73,7 +74,8 @@ class GeneralTests(oeSelfTest): | |||
73 | 74 | ||
74 | def test_java(self): | 75 | def test_java(self): |
75 | result = runCmd('which java', ignore_status=True) | 76 | result = runCmd('which java', ignore_status=True) |
76 | self.assertEqual(result.status, 0, "Java not found.") | 77 | self.assertEqual(result.status, 0, |
78 | "Java not found. Do you have a JDK installed on your host machine?") | ||
77 | 79 | ||
78 | def test_add_package(self): | 80 | def test_add_package(self): |
79 | print('') | 81 | print('') |
@@ -83,7 +85,7 @@ class GeneralTests(oeSelfTest): | |||
83 | logger = logging.getLogger("selftest") | 85 | logger = logging.getLogger("selftest") |
84 | 86 | ||
85 | logger.info('Running bitbake with man in the image package list') | 87 | logger.info('Running bitbake with man in the image package list') |
86 | self.write_config('IMAGE_INSTALL_append = " man "') | 88 | self.append_config('IMAGE_INSTALL_append = " man "') |
87 | bitbake('-c cleanall man') | 89 | bitbake('-c cleanall man') |
88 | bitbake('core-image-minimal') | 90 | bitbake('core-image-minimal') |
89 | result = runCmd('oe-pkgdata-util find-path /usr/bin/man') | 91 | result = runCmd('oe-pkgdata-util find-path /usr/bin/man') |
@@ -93,7 +95,7 @@ class GeneralTests(oeSelfTest): | |||
93 | logger.info('First image %s has size %i' % (path1, size1)) | 95 | logger.info('First image %s has size %i' % (path1, size1)) |
94 | 96 | ||
95 | logger.info('Running bitbake without man in the image package list') | 97 | logger.info('Running bitbake without man in the image package list') |
96 | self.write_config('IMAGE_INSTALL_remove = " man "') | 98 | self.append_config('IMAGE_INSTALL_remove = " man "') |
97 | bitbake('-c cleanall man') | 99 | bitbake('-c cleanall man') |
98 | bitbake('core-image-minimal') | 100 | bitbake('core-image-minimal') |
99 | result = runCmd('oe-pkgdata-util find-path /usr/bin/man', ignore_status=True) | 101 | result = runCmd('oe-pkgdata-util find-path /usr/bin/man', ignore_status=True) |
@@ -106,6 +108,61 @@ class GeneralTests(oeSelfTest): | |||
106 | self.assertNotEqual(size1, size2, "Image sizes are identical; image was not rebuilt.") | 108 | self.assertNotEqual(size1, size2, "Image sizes are identical; image was not rebuilt.") |
107 | 109 | ||
108 | 110 | ||
111 | class AktualizrToolsTests(oeSelfTest): | ||
112 | |||
113 | @classmethod | ||
114 | def setUpClass(cls): | ||
115 | logger = logging.getLogger("selftest") | ||
116 | bb_vars = get_bb_vars(['SYSROOT_DESTDIR', 'base_prefix', 'libdir', 'bindir'], | ||
117 | 'aktualizr-native') | ||
118 | cls.sysroot = bb_vars['SYSROOT_DESTDIR'] + bb_vars['base_prefix'] | ||
119 | cls.sysrootbin = bb_vars['SYSROOT_DESTDIR'] + bb_vars['bindir'] | ||
120 | cls.libdir = bb_vars['libdir'] | ||
121 | |||
122 | logger.info('Running bitbake to build aktualizr-native tools') | ||
123 | bitbake('aktualizr-native') | ||
124 | |||
125 | def runNativeCmd(self, cmd, **kwargs): | ||
126 | program, *_ = cmd.split(' ') | ||
127 | p = '{}/{}'.format(self.sysrootbin, program) | ||
128 | self.assertTrue(os.path.isfile(p), msg="No {} found ({})".format(program, p)) | ||
129 | env = dict(os.environ) | ||
130 | env['LD_LIBRARY_PATH'] = self.libdir | ||
131 | result = runCmd(cmd, env=env, native_sysroot=self.sysroot, ignore_status=True, **kwargs) | ||
132 | self.assertEqual(result.status, 0, "Status not equal to 0. output: %s" % result.output) | ||
133 | |||
134 | def test_implicit_writer_help(self): | ||
135 | self.runNativeCmd('aktualizr_implicit_writer --help') | ||
136 | |||
137 | def test_cert_provider_help(self): | ||
138 | self.runNativeCmd('aktualizr_cert_provider --help') | ||
139 | |||
140 | def test_cert_provider_local_output(self): | ||
141 | logger = logging.getLogger("selftest") | ||
142 | logger.info('Running bitbake to build aktualizr-implicit-prov') | ||
143 | bitbake('aktualizr-implicit-prov') | ||
144 | bb_vars = get_bb_vars(['SYSROOT_DESTDIR', 'bindir', 'libdir', | ||
145 | 'SOTA_PACKED_CREDENTIALS', 'T'], 'aktualizr-native') | ||
146 | creds = bb_vars['SOTA_PACKED_CREDENTIALS'] | ||
147 | temp_dir = bb_vars['T'] | ||
148 | bb_vars_prov = get_bb_vars(['STAGING_DIR_NATIVE', 'libdir'], 'aktualizr-implicit-prov') | ||
149 | config = bb_vars_prov['STAGING_DIR_NATIVE'] + bb_vars_prov['libdir'] + '/sota/sota_implicit_prov.toml' | ||
150 | |||
151 | self.runNativeCmd('aktualizr_cert_provider -c {creds} -r -l {temp} -g {config}' | ||
152 | .format(creds=creds, temp=temp_dir, config=config)) | ||
153 | |||
154 | # Might be nice if these names weren't hardcoded. | ||
155 | cert_path = temp_dir + '/client.pem' | ||
156 | self.assertTrue(os.path.isfile(cert_path), "Client certificate not found at %s." % cert_path) | ||
157 | self.assertTrue(os.path.getsize(cert_path) > 0, "Client certificate at %s is empty." % cert_path) | ||
158 | pkey_path = temp_dir + '/pkey.pem' | ||
159 | self.assertTrue(os.path.isfile(pkey_path), "Private key not found at %s." % pkey_path) | ||
160 | self.assertTrue(os.path.getsize(pkey_path) > 0, "Private key at %s is empty." % pkey_path) | ||
161 | ca_path = temp_dir + '/root.crt' | ||
162 | self.assertTrue(os.path.isfile(ca_path), "Client certificate not found at %s." % ca_path) | ||
163 | self.assertTrue(os.path.getsize(ca_path) > 0, "Client certificate at %s is empty." % ca_path) | ||
164 | |||
165 | |||
109 | class QemuTests(oeSelfTest): | 166 | class QemuTests(oeSelfTest): |
110 | 167 | ||
111 | @classmethod | 168 | @classmethod |
@@ -116,13 +173,14 @@ class QemuTests(oeSelfTest): | |||
116 | def tearDownClass(cls): | 173 | def tearDownClass(cls): |
117 | qemu_terminate(cls.s) | 174 | qemu_terminate(cls.s) |
118 | 175 | ||
119 | def run_command(self, command): | 176 | def qemu_command(self, command): |
120 | return qemu_send_command(self.qemu.ssh_port, command) | 177 | return qemu_send_command(self.qemu.ssh_port, command) |
121 | 178 | ||
122 | def test_hostname(self): | 179 | def test_qemu(self): |
123 | print('') | ||
124 | print('Checking machine name (hostname) of device:') | 180 | print('Checking machine name (hostname) of device:') |
125 | stdout, stderr, retcode = self.run_command('hostname') | 181 | stdout, stderr, retcode = self.qemu_command('hostname') |
182 | self.assertEqual(retcode, 0, "Unable to check hostname. " + | ||
183 | "Is an ssh daemon (such as dropbear or openssh) installed on the device?") | ||
126 | machine = get_bb_var('MACHINE', 'core-image-minimal') | 184 | machine = get_bb_var('MACHINE', 'core-image-minimal') |
127 | self.assertEqual(stderr, b'', 'Error: ' + stderr.decode()) | 185 | self.assertEqual(stderr, b'', 'Error: ' + stderr.decode()) |
128 | # Strip off line ending. | 186 | # Strip off line ending. |
@@ -130,30 +188,16 @@ class QemuTests(oeSelfTest): | |||
130 | self.assertEqual(value_str, machine, | 188 | self.assertEqual(value_str, machine, |
131 | 'MACHINE does not match hostname: ' + machine + ', ' + value_str) | 189 | 'MACHINE does not match hostname: ' + machine + ', ' + value_str) |
132 | print(value_str) | 190 | print(value_str) |
133 | |||
134 | def test_var_sota(self): | ||
135 | print('') | ||
136 | print('Checking contents of /var/sota:') | ||
137 | stdout, stderr, retcode = self.run_command('ls /var/sota') | ||
138 | self.assertEqual(stderr, b'', 'Error: ' + stderr.decode()) | ||
139 | self.assertEqual(retcode, 0) | ||
140 | print(stdout.decode()) | ||
141 | |||
142 | def test_aktualizr_info(self): | ||
143 | print('Checking output of aktualizr-info:') | 191 | print('Checking output of aktualizr-info:') |
144 | ran_ok = False | 192 | ran_ok = False |
145 | for delay in [0, 1, 2, 5, 10, 15]: | 193 | for delay in [0, 1, 2, 5, 10, 15]: |
146 | sleep(delay) | 194 | sleep(delay) |
147 | try: | 195 | stdout, stderr, retcode = self.qemu_command('aktualizr-info') |
148 | stdout, stderr, retcode = self.run_command('aktualizr-info') | 196 | if retcode == 0 and stderr == b'': |
149 | if retcode == 0 and stderr == b'': | 197 | ran_ok = True |
150 | ran_ok = True | 198 | break |
151 | break | 199 | self.assertTrue(ran_ok, 'aktualizr-info failed: ' + stderr.decode() + stdout.decode()) |
152 | except IOError as e: | 200 | |
153 | print(e) | ||
154 | if not ran_ok: | ||
155 | print(stdout.decode()) | ||
156 | print(stderr.decode()) | ||
157 | 201 | ||
158 | class GrubTests(oeSelfTest): | 202 | class GrubTests(oeSelfTest): |
159 | 203 | ||
@@ -174,19 +218,164 @@ class GrubTests(oeSelfTest): | |||
174 | runCmd('bitbake-layers remove-layer "%s"' % self.meta_intel, ignore_status=True) | 218 | runCmd('bitbake-layers remove-layer "%s"' % self.meta_intel, ignore_status=True) |
175 | runCmd('bitbake-layers remove-layer "%s"' % self.meta_minnow, ignore_status=True) | 219 | runCmd('bitbake-layers remove-layer "%s"' % self.meta_minnow, ignore_status=True) |
176 | 220 | ||
221 | def qemu_command(self, command): | ||
222 | return qemu_send_command(self.qemu.ssh_port, command) | ||
223 | |||
177 | def test_grub(self): | 224 | def test_grub(self): |
178 | print('') | 225 | print('') |
179 | print('Checking machine name (hostname) of device:') | 226 | print('Checking machine name (hostname) of device:') |
180 | value, err, retcode = qemu_send_command(self.qemu.ssh_port, 'hostname') | 227 | stdout, stderr, retcode = self.qemu_command('hostname') |
228 | self.assertEqual(retcode, 0, "Unable to check hostname. " + | ||
229 | "Is an ssh daemon (such as dropbear or openssh) installed on the device?") | ||
181 | machine = get_bb_var('MACHINE', 'core-image-minimal') | 230 | machine = get_bb_var('MACHINE', 'core-image-minimal') |
182 | self.assertEqual(err, b'', 'Error: ' + err.decode()) | 231 | self.assertEqual(stderr, b'', 'Error: ' + stderr.decode()) |
183 | self.assertEqual(retcode, 0) | ||
184 | # Strip off line ending. | 232 | # Strip off line ending. |
185 | value_str = value.decode()[:-1] | 233 | value = stdout.decode()[:-1] |
234 | self.assertEqual(value, machine, | ||
235 | 'MACHINE does not match hostname: ' + machine + ', ' + value + | ||
236 | '\nIs TianoCore ovmf installed on your host machine?') | ||
237 | print(value) | ||
238 | print('Checking output of aktualizr-info:') | ||
239 | ran_ok = False | ||
240 | for delay in [0, 1, 2, 5, 10, 15]: | ||
241 | sleep(delay) | ||
242 | stdout, stderr, retcode = self.qemu_command('aktualizr-info') | ||
243 | if retcode == 0 and stderr == b'': | ||
244 | ran_ok = True | ||
245 | break | ||
246 | self.assertTrue(ran_ok, 'aktualizr-info failed: ' + stderr.decode() + stdout.decode()) | ||
247 | |||
248 | |||
249 | class HsmTests(oeSelfTest): | ||
250 | |||
251 | @classmethod | ||
252 | def setUpClass(cls): | ||
253 | bb_vars = get_bb_vars(['SYSROOT_DESTDIR', 'base_prefix', 'libdir', 'bindir'], | ||
254 | 'aktualizr-native') | ||
255 | cls.sysroot = bb_vars['SYSROOT_DESTDIR'] + bb_vars['base_prefix'] | ||
256 | cls.sysrootbin = bb_vars['SYSROOT_DESTDIR'] + bb_vars['bindir'] | ||
257 | cls.libdir = bb_vars['libdir'] | ||
258 | |||
259 | def setUpLocal(self): | ||
260 | self.append_config('SOTA_CLIENT_PROV = "aktualizr-hsm-prov"') | ||
261 | self.append_config('SOTA_CLIENT_FEATURES = "hsm"') | ||
262 | self.qemu, self.s = qemu_launch(machine='qemux86-64') | ||
263 | |||
264 | def tearDownLocal(self): | ||
265 | qemu_terminate(self.s) | ||
266 | |||
267 | def runNativeCmd(self, cmd, **kwargs): | ||
268 | program, *_ = cmd.split(' ') | ||
269 | p = '{}/{}'.format(self.sysrootbin, program) | ||
270 | self.assertTrue(os.path.isfile(p), msg="No {} found ({})".format(program, p)) | ||
271 | env = dict(os.environ) | ||
272 | env['LD_LIBRARY_PATH'] = self.libdir | ||
273 | result = runCmd(cmd, env=env, native_sysroot=self.sysroot, ignore_status=True, **kwargs) | ||
274 | self.assertEqual(result.status, 0, "Status not equal to 0. output: %s" % result.output) | ||
275 | |||
276 | def qemu_command(self, command): | ||
277 | return qemu_send_command(self.qemu.ssh_port, command) | ||
278 | |||
279 | def test_provisioning(self): | ||
280 | print('Checking machine name (hostname) of device:') | ||
281 | stdout, stderr, retcode = self.qemu_command('hostname') | ||
282 | self.assertEqual(retcode, 0, "Unable to check hostname. " + | ||
283 | "Is an ssh daemon (such as dropbear or openssh) installed on the device?") | ||
284 | machine = get_bb_var('MACHINE', 'core-image-minimal') | ||
285 | self.assertEqual(stderr, b'', 'Error: ' + stderr.decode()) | ||
286 | # Strip off line ending. | ||
287 | value_str = stdout.decode()[:-1] | ||
186 | self.assertEqual(value_str, machine, | 288 | self.assertEqual(value_str, machine, |
187 | 'MACHINE does not match hostname: ' + machine + ', ' + value_str + | 289 | 'MACHINE does not match hostname: ' + machine + ', ' + value_str) |
188 | '\nIs tianocore ovmf installed?') | ||
189 | print(value_str) | 290 | print(value_str) |
291 | print('Checking output of aktualizr-info:') | ||
292 | ran_ok = False | ||
293 | for delay in [0, 1, 2, 5, 10, 15]: | ||
294 | stdout, stderr, retcode = self.qemu_command('aktualizr-info') | ||
295 | if retcode == 0 and stderr == b'': | ||
296 | ran_ok = True | ||
297 | break | ||
298 | self.assertTrue(ran_ok, 'aktualizr-info failed: ' + stderr.decode() + stdout.decode()) | ||
299 | # Verify that device has NOT yet provisioned. | ||
300 | self.assertIn(b'Couldn\'t load device ID', stdout, | ||
301 | 'Device already provisioned!? ' + stderr.decode() + stdout.decode()) | ||
302 | self.assertIn(b'Couldn\'t load ECU serials', stdout, | ||
303 | 'Device already provisioned!? ' + stderr.decode() + stdout.decode()) | ||
304 | self.assertIn(b'Provisioned on server: no', stdout, | ||
305 | 'Device already provisioned!? ' + stderr.decode() + stdout.decode()) | ||
306 | self.assertIn(b'Fetched metadata: no', stdout, | ||
307 | 'Device already provisioned!? ' + stderr.decode() + stdout.decode()) | ||
308 | |||
309 | # Verify that HSM is not yet initialized. | ||
310 | pkcs11_command = 'pkcs11-tool --module=/usr/lib/softhsm/libsofthsm2.so -O' | ||
311 | stdout, stderr, retcode = self.qemu_command(pkcs11_command) | ||
312 | self.assertNotEqual(retcode, 0, 'pkcs11-tool succeeded before initialization: ' + | ||
313 | stdout.decode() + stderr.decode()) | ||
314 | softhsm2_command = 'softhsm2-util --show-slots' | ||
315 | stdout, stderr, retcode = self.qemu_command(softhsm2_command) | ||
316 | self.assertNotEqual(retcode, 0, 'softhsm2-tool succeeded before initialization: ' + | ||
317 | stdout.decode() + stderr.decode()) | ||
318 | |||
319 | # Run cert_provider. | ||
320 | bb_vars = get_bb_vars(['SYSROOT_DESTDIR', 'bindir', 'libdir', | ||
321 | 'SOTA_PACKED_CREDENTIALS'], 'aktualizr-native') | ||
322 | creds = bb_vars['SOTA_PACKED_CREDENTIALS'] | ||
323 | bb_vars_prov = get_bb_vars(['STAGING_DIR_NATIVE', 'libdir'], 'aktualizr-hsm-prov') | ||
324 | config = bb_vars_prov['STAGING_DIR_NATIVE'] + bb_vars_prov['libdir'] + '/sota/sota_implicit_prov.toml' | ||
325 | |||
326 | self.runNativeCmd('aktualizr_cert_provider -c {creds} -t root@localhost -p {port} -r -s -g {config}' | ||
327 | .format(creds=creds, port=self.qemu.ssh_port, config=config)) | ||
328 | |||
329 | # Verify that HSM is able to initialize. | ||
330 | ran_ok = False | ||
331 | for delay in [5, 5, 5, 5, 10]: | ||
332 | sleep(delay) | ||
333 | p11_out, p11_err, p11_ret = self.qemu_command(pkcs11_command) | ||
334 | hsm_out, hsm_err, hsm_ret = self.qemu_command(softhsm2_command) | ||
335 | if p11_ret == 0 and hsm_ret == 0 and hsm_err == b'': | ||
336 | ran_ok = True | ||
337 | break | ||
338 | self.assertTrue(ran_ok, 'pkcs11-tool or softhsm2-tool failed: ' + p11_err.decode() + | ||
339 | p11_out.decode() + hsm_err.decode() + hsm_out.decode()) | ||
340 | self.assertIn(b'present token', p11_err, 'pkcs11-tool failed: ' + p11_err.decode() + p11_out.decode()) | ||
341 | self.assertIn(b'X.509 cert', p11_out, 'pkcs11-tool failed: ' + p11_err.decode() + p11_out.decode()) | ||
342 | self.assertIn(b'Initialized: yes', hsm_out, 'softhsm2-tool failed: ' + | ||
343 | hsm_err.decode() + hsm_out.decode()) | ||
344 | self.assertIn(b'User PIN init.: yes', hsm_out, 'softhsm2-tool failed: ' + | ||
345 | hsm_err.decode() + hsm_out.decode()) | ||
346 | |||
347 | # Check that pkcs11 output matches sofhsm output. | ||
348 | p11_p = re.compile(r'Using slot [0-9] with a present token \((0x[0-9a-f]*)\)\s') | ||
349 | p11_m = p11_p.search(p11_err.decode()) | ||
350 | self.assertTrue(p11_m, 'Slot number not found with pkcs11-tool: ' + p11_err.decode() + p11_out.decode()) | ||
351 | self.assertGreater(p11_m.lastindex, 0, 'Slot number not found with pkcs11-tool: ' + | ||
352 | p11_err.decode() + p11_out.decode()) | ||
353 | hsm_p = re.compile(r'Description:\s*SoftHSM slot ID (0x[0-9a-f]*)\s') | ||
354 | hsm_m = hsm_p.search(hsm_out.decode()) | ||
355 | self.assertTrue(hsm_m, 'Slot number not found with softhsm2-tool: ' + hsm_err.decode() + hsm_out.decode()) | ||
356 | self.assertGreater(hsm_m.lastindex, 0, 'Slot number not found with softhsm2-tool: ' + | ||
357 | hsm_err.decode() + hsm_out.decode()) | ||
358 | self.assertEqual(p11_m.group(1), hsm_m.group(1), 'Slot number does not match: ' + | ||
359 | p11_err.decode() + p11_out.decode() + hsm_err.decode() + hsm_out.decode()) | ||
360 | |||
361 | # Verify that device HAS provisioned. | ||
362 | ran_ok = False | ||
363 | for delay in [5, 5, 5, 5, 10]: | ||
364 | sleep(delay) | ||
365 | stdout, stderr, retcode = self.qemu_command('aktualizr-info') | ||
366 | if retcode == 0 and stderr == b'' and stdout.decode().find('Fetched metadata: yes') >= 0: | ||
367 | ran_ok = True | ||
368 | break | ||
369 | self.assertIn(b'Device ID: ', stdout, 'Provisioning failed: ' + stderr.decode() + stdout.decode()) | ||
370 | self.assertIn(b'Primary ecu hardware ID: qemux86-64', stdout, | ||
371 | 'Provisioning failed: ' + stderr.decode() + stdout.decode()) | ||
372 | self.assertIn(b'Fetched metadata: yes', stdout, 'Provisioning failed: ' + stderr.decode() + stdout.decode()) | ||
373 | p = re.compile(r'Device ID: ([a-z0-9-]*)\n') | ||
374 | m = p.search(stdout.decode()) | ||
375 | self.assertTrue(m, 'Device ID could not be read: ' + stderr.decode() + stdout.decode()) | ||
376 | self.assertGreater(m.lastindex, 0, 'Device ID could not be read: ' + stderr.decode() + stdout.decode()) | ||
377 | logger = logging.getLogger("selftest") | ||
378 | logger.info('Device successfully provisioned with ID: ' + m.group(1)) | ||
190 | 379 | ||
191 | 380 | ||
192 | def qemu_launch(efi=False, machine=None): | 381 | def qemu_launch(efi=False, machine=None): |
@@ -216,12 +405,14 @@ def qemu_launch(efi=False, machine=None): | |||
216 | sleep(10) | 405 | sleep(10) |
217 | return qemu, s | 406 | return qemu, s |
218 | 407 | ||
408 | |||
219 | def qemu_terminate(s): | 409 | def qemu_terminate(s): |
220 | try: | 410 | try: |
221 | s.terminate() | 411 | s.terminate() |
222 | except KeyboardInterrupt: | 412 | except KeyboardInterrupt: |
223 | pass | 413 | pass |
224 | 414 | ||
415 | |||
225 | def qemu_send_command(port, command): | 416 | def qemu_send_command(port, command): |
226 | command = ['ssh -q -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no root@localhost -p ' + | 417 | command = ['ssh -q -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no root@localhost -p ' + |
227 | str(port) + ' "' + command + '"'] | 418 | str(port) + ' "' + command + '"'] |