Coverage for qml_essentials/model.py: 92%

405 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-29 14:55 +0000

1from typing import Dict, Optional, Tuple, Callable, Union, List 

2import pennylane as qml 

3import pennylane.numpy as np 

4import hashlib 

5import os 

6import warnings 

7from autograd.numpy import numpy_boxes 

8from copy import deepcopy 

9import math 

10 

11from qml_essentials.ansaetze import Gates, Ansaetze, Circuit 

12from qml_essentials.utils import PauliCircuit, QuanTikz, MultiprocessingPool 

13 

14 

15import logging 

16 

17log = logging.getLogger(__name__) 

18 

19 

20class Model: 

21 """ 

22 A quantum circuit model. 

23 """ 

24 

25 lightning_threshold = 12 

26 cpu_scaler = 0.9 # default cpu scaler, =1 means full CPU for MP 

27 

28 def __init__( 

29 self, 

30 n_qubits: int, 

31 n_layers: int, 

32 circuit_type: Union[str, Circuit] = "No_Ansatz", 

33 data_reupload: Union[bool, List[bool], List[List[bool]]] = True, 

34 state_preparation: Union[str, Callable, List[str], List[Callable]] = None, 

35 encoding: Union[str, Callable, List[str], List[Callable]] = Gates.RX, 

36 trainable_frequencies: bool = False, 

37 initialization: str = "random", 

38 initialization_domain: List[float] = [0, 2 * np.pi], 

39 output_qubit: Union[List[int], int] = -1, 

40 shots: Optional[int] = None, 

41 random_seed: int = 1000, 

42 as_pauli_circuit: bool = False, 

43 remove_zero_encoding: bool = True, 

44 mp_threshold: int = -1, 

45 ) -> None: 

46 """ 

47 Initialize the quantum circuit model. 

48 Parameters will have the shape [impl_n_layers, parameters_per_layer] 

49 where impl_n_layers is the number of layers provided and added by one 

50 depending if data_reupload is True and parameters_per_layer is given by 

51 the chosen ansatz. 

52 

53 The model is initialized with the following parameters as defaults: 

54 - noise_params: None 

55 - execution_type: "expval" 

56 - shots: None 

57 

58 Args: 

59 n_qubits (int): The number of qubits in the circuit. 

60 n_layers (int): The number of layers in the circuit. 

61 circuit_type (str, Circuit): The type of quantum circuit to use. 

62 If None, defaults to "no_ansatz". 

63 data_reupload (Union[bool, List[bool], List[List[bool]]], optional): 

64 Whether to reupload data to the quantum device on each 

65 layer and qubit. Detailed re-uploading instructions can be given 

66 as a list/array of 0/False and 1/True with shape (n_qubits, 

67 n_layers) to specify where to upload the data. Defaults to True 

68 for applying data re-uploading to the full circuit. 

69 encoding (Union[str, Callable, List[str], List[Callable]], optional): 

70 The unitary to use for encoding the input data. Can be a string 

71 (e.g. "RX") or a callable (e.g. qml.RX). Defaults to qml.RX. 

72 If input is multidimensional it is assumed to be a list of 

73 unitaries or a list of strings. 

74 trainable_frequencies (bool, optional): 

75 Sets trainable encoding parameters for trainable frequencies. 

76 Defaults to False. 

77 initialization (str, optional): The strategy to initialize the parameters. 

78 Can be "random", "zeros", "zero-controlled", "pi", or "pi-controlled". 

79 Defaults to "random". 

80 output_qubit (List[int], int, optional): The index of the output 

81 qubit (or qubits). When set to -1 all qubits are measured, or a 

82 global measurement is conducted, depending on the execution 

83 type. 

84 shots (Optional[int], optional): The number of shots to use for 

85 the quantum device. Defaults to None. 

86 random_seed (int, optional): seed for the random number generator 

87 in initialization is "random" and for random noise parameters. 

88 Defaults to 1000. 

89 as_pauli_circuit (bool, optional): whether the circuit is 

90 transformed to a Pauli-Clifford circuit as described by Nemkov 

91 et al. (https://doi.org/10.1103/PhysRevA.108.032406), which is 

92 required for analytical Fourier coefficient computation. 

93 Defaults to False. 

94 remove_zero_encoding (bool, optional): whether to 

95 remove the zero encoding from the circuit. Defaults to True. 

96 mp_threshold (int, optional): threshold above which the parameter 

97 batch dimension is split across multiple processes. 

98 Defaults to -1. 

99 

100 Returns: 

101 None 

102 """ 

103 # Initialize default parameters needed for circuit evaluation 

104 self.noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None 

105 self.execution_type: Optional[str] = "expval" 

106 self.shots = shots 

107 self.remove_zero_encoding = remove_zero_encoding 

108 self.mp_threshold = mp_threshold 

109 self.n_qubits: int = n_qubits 

110 self.n_layers: int = n_layers 

111 self.trainable_frequencies: bool = trainable_frequencies 

112 

113 if isinstance(output_qubit, list): 

114 assert ( 

115 len(output_qubit) <= n_qubits 

116 ), f"Size of output_qubit {len(output_qubit)} cannot be\ 

117 larger than number of qubits {n_qubits}." 

118 self.output_qubit: Union[List[int], int] = output_qubit 

119 

120 # Initialize rng in Gates 

121 Gates.init_rng(random_seed) 

122 

123 # --- State Preparation --- 

124 # first check if we have a str, list or callable 

125 if isinstance(state_preparation, str): 

126 # if str, use the pennylane fct 

127 self._sp = [getattr(Gates, f"{state_preparation}")] 

128 elif isinstance(state_preparation, list): 

129 # if list, check if str or callable 

130 if isinstance(state_preparation[0], str): 

131 self._sp = [getattr(Gates, f"{sp}") for sp in state_preparation] 

132 else: 

133 self._sp = state_preparation 

134 elif state_preparation is None: 

135 self._sp = [lambda *args, **kwargs: None] 

136 else: 

137 # default to callable 

138 self._sp = [state_preparation] 

139 

140 # --- Encoding --- 

141 # first check if we have a str, list or callable 

142 if isinstance(encoding, str): 

143 # if str, use the pennylane fct 

144 self._enc = [getattr(Gates, f"{encoding}")] 

145 elif isinstance(encoding, list): 

146 # if list, check if str or callable 

147 if isinstance(encoding[0], str): 

148 self._enc = [getattr(Gates, f"{enc}") for enc in encoding] 

149 else: 

150 self._enc = encoding 

151 else: 

152 # default to callable 

153 self._enc = [encoding] 

154 

155 # Number of possible inputs 

156 self.n_input_feat = len(self._enc) 

157 log.info(f"Number of input features: {self.n_input_feat}") 

158 

159 # Trainable frequencies, default initialization as in arXiv:2309.03279v2 

160 self.enc_params = np.ones( 

161 (self.n_qubits, self.n_input_feat), requires_grad=trainable_frequencies 

162 ) 

163 

164 # --- Data-Reuploading --- 

165 # Process data reuploading strategy and set degree 

166 if not isinstance(data_reupload, bool): 

167 if not isinstance(data_reupload, np.ndarray): 

168 data_reupload = np.array(data_reupload) 

169 if data_reupload.shape == ( 

170 n_layers, 

171 n_qubits, 

172 ): 

173 data_reupload = data_reupload.reshape(*data_reupload.shape, 1) 

174 data_reupload = np.repeat(data_reupload, self.n_input_feat, axis=2) 

175 

176 assert data_reupload.shape == ( 

177 n_layers, 

178 n_qubits, 

179 self.n_input_feat, 

180 ), f"Data reuploading array has wrong shape. \ 

181 Expected {(n_layers, n_qubits)} or\ 

182 {(n_layers, n_qubits, self.n_input_feat)},\ 

183 got {data_reupload.shape}." 

184 

185 log.debug(f"Data reuploading array:\n{data_reupload}") 

186 else: 

187 if data_reupload: 

188 impl_n_layers: int = ( 

189 n_layers + 1 

190 ) # we need L+1 according to Schuld et al. 

191 data_reupload = np.ones((n_layers, n_qubits, self.n_input_feat)) 

192 log.debug("Full data reuploading.") 

193 else: 

194 impl_n_layers: int = n_layers 

195 data_reupload = np.zeros((n_layers, n_qubits, self.n_input_feat)) 

196 data_reupload[0][0] = 1 

197 log.debug("No data reuploading.") 

198 

199 # convert to boolean values 

200 self.data_reupload = data_reupload.astype(bool) 

201 self.frequencies = [ 

202 np.count_nonzero(self.data_reupload[..., i]) 

203 for i in range(self.n_input_feat) 

204 ] 

205 

206 if self.degree > 1: 

207 impl_n_layers: int = n_layers + 1 # we need L+1 according to Schuld et al. 

208 else: 

209 impl_n_layers = n_layers 

210 log.info(f"Number of implicit layers: {impl_n_layers}.") 

211 

212 # --- Ansatz --- 

213 # only weak check for str. We trust the user to provide sth useful 

214 if isinstance(circuit_type, str): 

215 self.pqc: Callable[[Optional[np.ndarray], int], int] = getattr( 

216 Ansaetze, circuit_type or "No_Ansatz" 

217 )() 

218 else: 

219 self.pqc = circuit_type() 

220 log.info(f"Using Ansatz {circuit_type}.") 

221 

222 # calculate the shape of the parameter vector here, we will re-use this in init. 

223 params_per_layer = self.pqc.n_params_per_layer(self.n_qubits) 

224 self._params_shape: Tuple[int, int] = (impl_n_layers, params_per_layer) 

225 log.info(f"Parameters per layer: {params_per_layer}") 

226 

227 self.batch_shape = (1, 1) 

228 # this will also be re-used in the init method, 

229 # however, only if nothing is provided 

230 self._inialization_strategy = initialization 

231 self._initialization_domain = initialization_domain 

232 

233 # ..here! where we only require a rng 

234 self.initialize_params(np.random.default_rng(random_seed)) 

235 

236 # Initialize two circuits, one with the default device and 

237 # one with the mixed device 

238 # which allows us to later route depending on the state_vector flag 

239 self.as_pauli_circuit = as_pauli_circuit 

240 

241 self.circuit_mixed: qml.QNode = qml.QNode( 

242 self._circuit, 

243 qml.device("default.mixed", shots=self.shots, wires=self.n_qubits), 

244 interface="autograd" if self.shots is not None else "auto", 

245 diff_method="parameter-shift" if self.shots is not None else "best", 

246 ) 

247 

248 @property 

249 def degree(self): 

250 return max(self.frequencies) 

251 

252 @property 

253 def as_pauli_circuit(self) -> bool: 

254 return self._as_pauli_circuit 

255 

256 @as_pauli_circuit.setter 

257 def as_pauli_circuit(self, value: bool) -> None: 

258 self._as_pauli_circuit = value 

259 

260 if self.n_qubits < self.lightning_threshold: 

261 device = "default.qubit" 

262 else: 

263 device = "lightning.qubit" 

264 self.mp_threshold = -1 

265 

266 self.circuit: qml.QNode = qml.QNode( 

267 self._circuit, 

268 qml.device( 

269 device, 

270 shots=self.shots, 

271 wires=self.n_qubits, 

272 ), 

273 interface="autograd" if self.shots is not None else "auto", 

274 diff_method="parameter-shift" if self.shots is not None else "best", 

275 ) 

276 

277 if value: 

278 pauli_circuit_transform = qml.transform( 

279 PauliCircuit.from_parameterised_circuit 

280 ) 

281 self.circuit = pauli_circuit_transform(self.circuit) 

282 

283 @property 

284 def noise_params(self) -> Optional[Dict[str, Union[float, Dict[str, float]]]]: 

285 """ 

286 Gets the noise parameters of the model. 

287 

288 Returns: 

289 Optional[Dict[str, float]]: A dictionary of 

290 noise parameters or None if not set. 

291 """ 

292 return self._noise_params 

293 

294 @noise_params.setter 

295 def noise_params( 

296 self, kvs: Optional[Dict[str, Union[float, Dict[str, float]]]] 

297 ) -> None: 

298 """ 

299 Sets the noise parameters of the model. 

300 

301 Typically a "noise parameter" refers to the error probability. 

302 ThermalRelaxation is a special case, and supports a dict as value with 

303 structure: 

304 "ThermalRelaxation": 

305 { 

306 "t1": 2000, # relative t1 time. 

307 "t2": 1000, # relative t2 time 

308 "t_factor" 1: # relative gate time factor 

309 }, 

310 

311 Args: 

312 kvs (Optional[Dict[str, Union[float, Dict[str, float]]]]): A 

313 dictionary of noise parameters. If all values are 0.0, the noise 

314 parameters are set to None. 

315 

316 Returns: 

317 None 

318 """ 

319 # set to None if only zero values provided 

320 if kvs is not None and all(np == 0.0 for np in kvs.values()): 

321 kvs = None 

322 

323 # set default values 

324 if kvs is not None: 

325 kvs.setdefault("BitFlip", 0.0) 

326 kvs.setdefault("PhaseFlip", 0.0) 

327 kvs.setdefault("Depolarizing", 0.0) 

328 kvs.setdefault("MultiQubitDepolarizing", 0.0) 

329 kvs.setdefault("AmplitudeDamping", 0.0) 

330 kvs.setdefault("PhaseDamping", 0.0) 

331 kvs.setdefault("GateError", 0.0) 

332 kvs.setdefault("ThermalRelaxation", None) 

333 kvs.setdefault("StatePreparation", 0.0) 

334 kvs.setdefault("Measurement", 0.0) 

335 

336 # check if there are any keys not supported 

337 for key in kvs.keys(): 

338 if key not in [ 

339 "BitFlip", 

340 "PhaseFlip", 

341 "Depolarizing", 

342 "MultiQubitDepolarizing", 

343 "AmplitudeDamping", 

344 "PhaseDamping", 

345 "GateError", 

346 "ThermalRelaxation", 

347 "StatePreparation", 

348 "Measurement", 

349 ]: 

350 warnings.warn( 

351 f"Noise type {key} is not supported by this package", 

352 UserWarning, 

353 ) 

354 

355 # check valid params for thermal relaxation noise channel 

356 tr_params = kvs["ThermalRelaxation"] 

357 if isinstance(tr_params, dict): 

358 tr_params.setdefault("t1", 0.0) 

359 tr_params.setdefault("t2", 0.0) 

360 tr_params.setdefault("t_factor", 0.0) 

361 for k in tr_params.keys(): 

362 if k not in [ 

363 "t1", 

364 "t2", 

365 "t_factor", 

366 ]: 

367 warnings.warn( 

368 f"Thermal Relaxation parameter {k} is not supported " 

369 f"by this package", 

370 UserWarning, 

371 ) 

372 if not all(tr_params.values()) or tr_params["t2"] > 2 * tr_params["t1"]: 

373 warnings.warn( 

374 "Received invalid values for Thermal Relaxation noise " 

375 "parameter. Thermal relaxation is not applied!", 

376 UserWarning, 

377 ) 

378 kvs["ThermalRelaxation"] = 0.0 

379 

380 self._noise_params = kvs 

381 

382 @property 

383 def execution_type(self) -> str: 

384 """ 

385 Gets the execution type of the model. 

386 

387 Returns: 

388 str: The execution type, one of 'density', 'expval', or 'probs'. 

389 """ 

390 return self._execution_type 

391 

392 @execution_type.setter 

393 def execution_type(self, value: str) -> None: 

394 if value not in ["density", "state", "expval", "probs"]: 

395 raise ValueError(f"Invalid execution type: {value}.") 

396 

397 if (value == "density" or value == "state") and self.output_qubit != -1: 

398 warnings.warn( 

399 f"{value} measurement does ignore output_qubit, which is " 

400 f"{self.output_qubit}.", 

401 UserWarning, 

402 ) 

403 

404 if value == "probs" and self.shots is None: 

405 warnings.warn( 

406 "Setting execution_type to probs without specifying shots.", 

407 UserWarning, 

408 ) 

409 

410 if value == "density" and self.shots is not None: 

411 warnings.warn( 

412 "Setting execution_type to density with specified shots.", 

413 UserWarning, 

414 ) 

415 

416 self._execution_type = value 

417 

418 @property 

419 def shots(self) -> Optional[int]: 

420 """ 

421 Gets the number of shots to use for the quantum device. 

422 

423 Returns: 

424 Optional[int]: The number of shots. 

425 """ 

426 return self._shots 

427 

428 @shots.setter 

429 def shots(self, value: Optional[int]) -> None: 

430 """ 

431 Sets the number of shots to use for the quantum device. 

432 

433 Args: 

434 value (Optional[int]): The number of shots. 

435 If an integer less than or equal to 0 is provided, it is set to None. 

436 

437 Returns: 

438 None 

439 """ 

440 if type(value) is int and value <= 0: 

441 value = None 

442 self._shots = value 

443 

444 def initialize_params( 

445 self, 

446 rng: np.random.Generator, 

447 repeat: int = None, 

448 initialization: str = None, 

449 initialization_domain: List[float] = None, 

450 ) -> None: 

451 """ 

452 Initializes the parameters of the model. 

453 

454 Args: 

455 rng: A random number generator to use for initialization. 

456 repeat: The number of times to repeat the parameters. 

457 If None, the number of layers is used. 

458 initialization: The strategy to use for parameter initialization. 

459 If None, the strategy specified in the constructor is used. 

460 initialization_domain: The domain to use for parameter initialization. 

461 If None, the domain specified in the constructor is used. 

462 

463 Returns: 

464 None 

465 """ 

466 params_shape = ( 

467 self._params_shape if repeat is None else [*self._params_shape, repeat] 

468 ) 

469 # use existing strategy if not specified 

470 initialization = initialization or self._inialization_strategy 

471 initialization_domain = initialization_domain or self._initialization_domain 

472 

473 def set_control_params(params: np.ndarray, value: float) -> np.ndarray: 

474 indices = self.pqc.get_control_indices(self.n_qubits) 

475 if indices is None: 

476 warnings.warn( 

477 f"Specified {initialization} but circuit\ 

478 does not contain controlled rotation gates.\ 

479 Parameters are intialized randomly.", 

480 UserWarning, 

481 ) 

482 else: 

483 params[:, indices[0] : indices[1] : indices[2]] = ( 

484 np.ones_like(params[:, indices[0] : indices[1] : indices[2]]) 

485 * value 

486 ) 

487 return params 

488 

489 if initialization == "random": 

490 self.params: np.ndarray = rng.uniform( 

491 *initialization_domain, params_shape, requires_grad=True 

492 ) 

493 elif initialization == "zeros": 

494 self.params: np.ndarray = np.zeros(params_shape, requires_grad=True) 

495 elif initialization == "pi": 

496 self.params: np.ndarray = np.ones(params_shape, requires_grad=True) * np.pi 

497 elif initialization == "zero-controlled": 

498 self.params: np.ndarray = rng.uniform( 

499 *initialization_domain, params_shape, requires_grad=True 

500 ) 

501 self.params = set_control_params(self.params, 0) 

502 elif initialization == "pi-controlled": 

503 self.params: np.ndarray = rng.uniform( 

504 *initialization_domain, params_shape, requires_grad=True 

505 ) 

506 self.params = set_control_params(self.params, np.pi) 

507 else: 

508 raise Exception("Invalid initialization method") 

509 

510 log.info( 

511 f"Initialized parameters with shape {self.params.shape}\ 

512 using strategy {initialization}." 

513 ) 

514 

515 def transform_input(self, inputs: np.ndarray, enc_params: Optional[np.ndarray]): 

516 """ 

517 Transforms the input as in arXiv:2309.03279v2 

518 

519 Args: 

520 inputs (np.ndarray): single input point of shape (1, n_input_feat) 

521 idx (int): feature index 

522 qubit (int): qubit on which to the encoding is being performed 

523 enc_params (np.ndarray): encoding weight vector of 

524 shape (n_qubits) 

525 

526 Returns: 

527 np.ndarray: transformed input of shape (1,), linearly scaled by 

528 enc_params, ready for encoding 

529 """ 

530 return inputs * enc_params 

531 

532 def _iec( 

533 self, 

534 inputs: np.ndarray, 

535 data_reupload: np.ndarray, 

536 enc: Union[Callable, List[Callable]], 

537 enc_params: np.ndarray, 

538 noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None, 

539 ) -> None: 

540 """ 

541 Creates an AngleEncoding using RX gates 

542 

543 Args: 

544 inputs (np.ndarray): single input point of shape (1, n_input_feat) 

545 data_reupload (np.ndarray): Boolean array to indicate positions in 

546 the circuit for data re-uploading for the IEC, shape is 

547 (n_qubits, n_layers). 

548 enc: Callable or List[Callable]: encoding function or list of encoding 

549 functions 

550 enc_params (np.ndarray): encoding weight vector 

551 of shape [n_qubits, n_inputs] 

552 noise_params (Optional[Dict[str, Union[float, Dict[str, float]]]]): 

553 The noise parameters. 

554 Returns: 

555 None 

556 """ 

557 # check for zero, because due to input validation, input cannot be none 

558 if self.remove_zero_encoding and not inputs.any(): 

559 return 

560 

561 for q in range(self.n_qubits): 

562 for idx in range(inputs.shape[1]): 

563 if data_reupload[q, idx]: 

564 enc[idx]( 

565 self.transform_input(inputs[:, idx], enc_params[q, idx]), 

566 wires=q, 

567 noise_params=noise_params, 

568 ) 

569 

570 def _circuit( 

571 self, 

572 params: np.ndarray, 

573 inputs: np.ndarray, 

574 enc_params: Optional[np.ndarray] = None, 

575 ) -> Union[float, np.ndarray]: 

576 """ 

577 Creates a circuit with noise. 

578 

579 Args: 

580 params (np.ndarray): weight vector of shape 

581 [n_layers, n_qubits*(n_params_per_layer+trainable_frequencies)] 

582 inputs (np.ndarray): input vector of size 1 

583 enc_params Optional[np.ndarray]: encoding weight vector 

584 of shape [n_qubits, n_inputs] 

585 Returns: 

586 Union[float, np.ndarray]: Expectation value of PauliZ(0) 

587 of the circuit if state_vector is False and expval is True, 

588 otherwise the density matrix of all qubits. 

589 """ 

590 

591 self._variational(params=params, inputs=inputs, enc_params=enc_params) 

592 return self._observable() 

593 

594 def _variational(self, params, inputs, enc_params=None): 

595 if enc_params is None: 

596 warnings.warn( 

597 "Explicit call to `_circuit` or `_variational` detected: " 

598 "`enc_params` is None, using `self.enc_params` instead.", 

599 RuntimeWarning, 

600 ) 

601 enc_params = self.enc_params 

602 

603 if self.noise_params is not None: 

604 self._apply_state_prep_noise() 

605 

606 # state preparation 

607 for q in range(self.n_qubits): 

608 for _sp in self._sp: 

609 _sp(wires=q, noise_params=self.noise_params) 

610 

611 # circuit building 

612 for layer in range(0, self.n_layers): 

613 # ansatz layers 

614 self.pqc(params[layer], self.n_qubits, noise_params=self.noise_params) 

615 

616 # encoding layers 

617 self._iec( 

618 inputs, 

619 data_reupload=self.data_reupload[layer], 

620 enc=self._enc, 

621 enc_params=enc_params, 

622 noise_params=self.noise_params, 

623 ) 

624 

625 # visual barrier 

626 if self.degree > 1: 

627 qml.Barrier(wires=list(range(self.n_qubits)), only_visual=True) 

628 

629 # final ansatz layer 

630 if self.degree > 1: # same check as in init 

631 self.pqc(params[-1], self.n_qubits, noise_params=self.noise_params) 

632 

633 # channel noise 

634 if self.noise_params is not None: 

635 self._apply_general_noise() 

636 

637 def _observable(self): 

638 # run mixed simualtion and get density matrix 

639 if self.execution_type == "density": 

640 return qml.density_matrix(wires=list(range(self.n_qubits))) 

641 elif self.execution_type == "state": 

642 return qml.state() 

643 # run default simulation and get expectation value 

644 elif self.execution_type == "expval": 

645 # n-local measurement 

646 if self.output_qubit == -1: 

647 return [qml.expval(qml.PauliZ(q)) for q in range(self.n_qubits)] 

648 # local measurement(s) 

649 elif isinstance(self.output_qubit, int): 

650 return qml.expval(qml.PauliZ(self.output_qubit)) 

651 # parity measurenment 

652 elif isinstance(self.output_qubit, list): 

653 obs = qml.PauliZ(self.output_qubit[0]) 

654 for out_qubit in self.output_qubit[1:]: 

655 obs = obs @ qml.PauliZ(out_qubit) 

656 return qml.expval(obs) 

657 else: 

658 raise ValueError( 

659 f"Invalid parameter `output_qubit`: {self.output_qubit}.\ 

660 Must be int, list or -1." 

661 ) 

662 # run default simulation and get probs 

663 elif self.execution_type == "probs": 

664 if self.output_qubit == -1: 

665 return qml.probs(wires=list(range(self.n_qubits))) 

666 else: 

667 return qml.probs(wires=self.output_qubit) 

668 else: 

669 raise ValueError(f"Invalid execution_type: {self.execution_type}.") 

670 

671 def _apply_state_prep_noise(self) -> None: 

672 """ 

673 Applies a state preparation error on each qubit according to the 

674 probability for StatePreparation provided in the noise_params. 

675 """ 

676 p = self.noise_params.get("StatePreparation", 0.0) 

677 for q in range(self.n_qubits): 

678 if p > 0: 

679 qml.BitFlip(p, wires=q) 

680 

681 def _apply_general_noise(self) -> None: 

682 """ 

683 Applies general types of noise the full circuit (in contrast to gate 

684 errors, applied directly at gate level, see Gates.Noise). 

685 

686 Possible types of noise are: 

687 - AmplitudeDamping (specified through probability) 

688 - PhaseDamping (specified through probability) 

689 - ThermalRelaxation (specified through a dict, containing keys 

690 "t1", "t2", "t_factor") 

691 - Measurement (specified through probability) 

692 """ 

693 amp_damp = self.noise_params.get("AmplitudeDamping", 0.0) 

694 phase_damp = self.noise_params.get("PhaseDamping", 0.0) 

695 thermal_relax = self.noise_params.get("ThermalRelaxation", 0.0) 

696 meas = self.noise_params.get("Measurement", 0.0) 

697 for q in range(self.n_qubits): 

698 if amp_damp > 0: 

699 qml.AmplitudeDamping(amp_damp, wires=q) 

700 if phase_damp > 0: 

701 qml.PhaseDamping(phase_damp, wires=q) 

702 if meas > 0: 

703 qml.BitFlip(meas, wires=q) 

704 if isinstance(thermal_relax, dict): 

705 t1 = thermal_relax["t1"] 

706 t2 = thermal_relax["t2"] 

707 t_factor = thermal_relax["t_factor"] 

708 circuit_depth = self.get_circuit_depth() 

709 tg = circuit_depth * t_factor 

710 qml.ThermalRelaxationError(1.0, t1, t2, tg, q) 

711 

712 def draw(self, inputs=None, figure="text", *args, **kwargs): 

713 """ 

714 Draws the quantum circuit using the specified visualization method. 

715 

716 Args: 

717 inputs (Optional[np.ndarray]): Input vector for the circuit. If None, 

718 the default inputs are used. 

719 figure (str, optional): The type of figure to generate. Must be one of 

720 'text', 'mpl', or 'tikz'. Defaults to 'text'. 

721 Returns: 

722 Either a string, matplotlib figure or TikzFigure object (similar to string) 

723 depending on the chosen visualization. 

724 *args: 

725 Additional arguments to be passed to the visualization method. 

726 **kwargs: 

727 Additional keyword arguments to be passed to the visualization method. 

728 

729 Raises: 

730 AssertionError: If the 'figure' argument is not one of the accepted values. 

731 """ 

732 

733 if not isinstance(self.circuit, qml.QNode): 

734 # TODO: throws strange argument error if not catched 

735 return "" 

736 

737 assert figure in [ 

738 "text", 

739 "mpl", 

740 "tikz", 

741 ], f"Invalid figure: {figure}. Must be 'text', 'mpl' or 'tikz'." 

742 

743 inputs = self._inputs_validation(inputs) 

744 

745 if figure == "mpl": 

746 result = qml.draw_mpl(self.circuit)( 

747 params=self.params, 

748 inputs=inputs, 

749 enc_params=self.enc_params, 

750 *args, 

751 **kwargs, 

752 ) 

753 elif figure == "tikz": 

754 result = QuanTikz.build( 

755 self.circuit, 

756 params=self.params, 

757 inputs=inputs, 

758 enc_params=self.enc_params, 

759 *args, 

760 **kwargs, 

761 ) 

762 else: 

763 result = qml.draw(self.circuit)( 

764 params=self.params, inputs=inputs, enc_params=self.enc_params 

765 ) 

766 return result 

767 

768 def __repr__(self) -> str: 

769 return self.draw(figure="text") 

770 

771 def __str__(self) -> str: 

772 return self.draw(figure="text") 

773 

774 def _params_validation(self, params) -> np.ndarray: 

775 """ 

776 Sets the parameters when calling the quantum circuit 

777 

778 Args: 

779 params (np.ndarray): The parameters used for the call 

780 """ 

781 if params is None: 

782 params = self.params 

783 else: 

784 if numpy_boxes.ArrayBox == type(params): 

785 self.params = params._value 

786 else: 

787 self.params = params 

788 

789 # Get rid of extra dimension 

790 if len(params.shape) == 3 and params.shape[2] == 1: 

791 params = params[:, :, 0] 

792 

793 return params 

794 

795 def _enc_params_validation(self, enc_params) -> np.ndarray: 

796 """ 

797 Sets the encoding parameters when calling the quantum circuit 

798 

799 Args: 

800 enc_params (np.ndarray): The encoding parameters used for the call 

801 """ 

802 if enc_params is None: 

803 enc_params = self.enc_params 

804 else: 

805 if isinstance(enc_params, numpy_boxes.ArrayBox): 

806 if self.trainable_frequencies: 

807 self.enc_params = enc_params._value 

808 else: 

809 self.enc_params = np.array( 

810 enc_params._value, requires_grad=self.trainable_frequencies 

811 ) 

812 else: 

813 if self.trainable_frequencies: 

814 self.enc_params = enc_params 

815 else: 

816 self.enc_params = np.array( 

817 enc_params, requires_grad=self.trainable_frequencies 

818 ) 

819 

820 if len(enc_params.shape) == 1 and self.n_input_feat == 1: 

821 enc_params = enc_params.reshape(-1, 1) 

822 elif len(enc_params.shape) == 1 and self.n_input_feat > 1: 

823 raise ValueError( 

824 f"Input dimension {self.n_input_feat} >1 but \ 

825 `enc_params` has shape {enc_params.shape}" 

826 ) 

827 

828 return enc_params 

829 

830 def _inputs_validation( 

831 self, inputs: Union[None, List, float, int, np.ndarray] 

832 ) -> np.ndarray: 

833 """ 

834 Validate the inputs to be a 2D numpy array of shape (batch_size, n_inputs). 

835 

836 Args: 

837 inputs (Union[None, List, float, int, np.ndarray]): The input to validate. 

838 

839 Returns: 

840 np.ndarray: The validated input. 

841 """ 

842 if inputs is None: 

843 # initialize to zero 

844 inputs = np.array([[0] * self.n_input_feat]) 

845 elif isinstance(inputs, List): 

846 inputs = np.stack(inputs) 

847 elif isinstance(inputs, float) or isinstance(inputs, int): 

848 inputs = np.array([inputs]) 

849 

850 if len(inputs.shape) <= 1: 

851 if self.n_input_feat == 1: 

852 # add a batch dimension 

853 inputs = inputs.reshape(-1, 1) 

854 else: 

855 if inputs.shape[0] == self.n_input_feat: 

856 inputs = inputs.reshape(1, -1) 

857 else: 

858 inputs = inputs.reshape(-1, 1) 

859 inputs = inputs.repeat(self.n_input_feat, axis=1) 

860 warnings.warn( 

861 f"Expected {self.n_input_feat} inputs, but {inputs.shape[0]} " 

862 "was provided, replicating input for all input features.", 

863 UserWarning, 

864 ) 

865 else: 

866 if inputs.shape[1] != self.n_input_feat: 

867 raise ValueError( 

868 f"Wrong number of inputs provided. Expected {self.n_input_feat} " 

869 f"inputs, but input has shape {inputs.shape}." 

870 ) 

871 

872 return inputs 

873 

874 @staticmethod 

875 def _parallel_f( 

876 procnum, 

877 result, 

878 f, 

879 batch_size, 

880 params, 

881 inputs, 

882 batch_shape, 

883 enc_params, 

884 ): 

885 """ 

886 Helper function for parallelizing a function f over parameters. 

887 Sices the batch dimension based on the procnum and batch size. 

888 

889 Args: 

890 procnum: The process number. 

891 result: The result array. 

892 f: The function to be parallelized. 

893 batch_size: The batch size. 

894 params: The parameters array. 

895 inputs: The inputs array. 

896 enc_params: The encoding parameters array. 

897 """ 

898 min_idx = max(procnum * batch_size, 0) 

899 

900 if batch_shape[0] > 1: 

901 max_idx = min((procnum + 1) * batch_size, inputs.shape[0]) 

902 inputs = inputs[min_idx:max_idx] 

903 if batch_shape[1] > 1: 

904 max_idx = min((procnum + 1) * batch_size, params.shape[2]) 

905 params = params[:, :, min_idx:max_idx] 

906 

907 result[procnum] = f(params=params, inputs=inputs, enc_params=enc_params) 

908 

909 def _mp_executor(self, f, params, inputs, enc_params): 

910 """ 

911 Execute a function f in parallel over parameters. 

912 

913 Args: 

914 f: A function that takes two arguments, params and inputs, 

915 and returns a numpy array. 

916 params: A 3D numpy array of parameters where the first dimension is 

917 the layer index, the second dimension is the parameter index in 

918 the layer, and the third dimension is the sample index. 

919 inputs: A 2D numpy array of inputs where the first dimension is 

920 the sample index and the second dimension is the input feature index. 

921 enc_params: A 1D numpy array of encoding parameters where the dimension is 

922 the qubit index. 

923 

924 Returns: 

925 A numpy array of the output of f applied to each batch of 

926 samples in params, enc_params, and inputs. 

927 """ 

928 n_processes = 1 

929 # batches available? 

930 combined_batch_size = math.prod(self.batch_shape) 

931 if ( 

932 combined_batch_size > 1 

933 and self.mp_threshold > 0 

934 and combined_batch_size > self.mp_threshold 

935 ): 

936 n_processes = math.ceil(combined_batch_size / self.mp_threshold) 

937 # check if single process 

938 if n_processes == 1: 

939 if self.mp_threshold > 0: 

940 warnings.warn( 

941 f"Multiprocessing threshold {self.mp_threshold}>0, but using \ 

942 single process, because {combined_batch_size} samples per batch.", 

943 ) 

944 result = f(params=params, inputs=inputs, enc_params=enc_params) 

945 else: 

946 log.info(f"Using {n_processes} processes") 

947 mpp = MultiprocessingPool( 

948 target=Model._parallel_f, 

949 n_processes=n_processes, 

950 cpu_scaler=self.cpu_scaler, 

951 batch_size=self.mp_threshold, 

952 f=f, 

953 params=params, 

954 enc_params=enc_params, 

955 inputs=inputs, 

956 batch_shape=self.batch_shape, 

957 ) 

958 return_dict = mpp.spawn() 

959 

960 # TODO: the following code could use some optimization 

961 result = [None] * len(return_dict) 

962 for k, v in return_dict.items(): 

963 result[k] = v 

964 

965 result = np.concat(result, axis=1 if self.execution_type == "expval" else 0) 

966 return result 

967 

968 def _assimilate_batch(self, inputs, params): 

969 batch_shape = ( 

970 inputs.shape[0], 

971 params.shape[2] if len(params.shape) == 3 else 1, 

972 ) 

973 

974 if ( 

975 batch_shape[1] != 1 

976 and batch_shape[0] != batch_shape[1] 

977 and batch_shape[0] > 1 

978 ): 

979 # the following code does some dirty reshaping 

980 # TODO: optimize but be aware of the rabbit hole 

981 # key is to get the right "order" in which we repeat 

982 

983 # [BI,D] -> [BPxBI,D] 

984 inputs = np.repeat(inputs, batch_shape[1], axis=0) 

985 

986 # this is a tricky one, essentially we want to get 

987 # [L,Q,BP] -> [L,Q,BI,BP] -> [L,Q,BPxBI] 

988 params = np.repeat( 

989 params[:, :, np.newaxis, :], batch_shape[0], axis=2 

990 ).reshape([*params.shape[:-1], np.prod(batch_shape)]) 

991 

992 return inputs, params, batch_shape 

993 

994 def _requires_density(self): 

995 """ 

996 Checks if the current model requires density matrix simulation or not 

997 based on the noise_params variable and the execution type 

998 

999 Returns: 

1000 bool: True if model requires density simulation 

1001 """ 

1002 if self.execution_type == "density": 

1003 return True 

1004 

1005 if self.noise_params is not None: 

1006 coherent_noise = ["GateError"] 

1007 for k, v in self.noise_params.items(): 

1008 if k in coherent_noise: 

1009 continue 

1010 if v is not None and v > 0: 

1011 return True 

1012 return False 

1013 

1014 def __call__( 

1015 self, 

1016 params: Optional[np.ndarray] = None, 

1017 inputs: Optional[np.ndarray] = None, 

1018 enc_params: Optional[np.ndarray] = None, 

1019 noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None, 

1020 cache: Optional[bool] = False, 

1021 execution_type: Optional[str] = None, 

1022 force_mean: bool = False, 

1023 ) -> np.ndarray: 

1024 """ 

1025 Perform a forward pass of the quantum circuit. 

1026 

1027 Args: 

1028 params (Optional[np.ndarray]): Weight vector of shape 

1029 [n_layers, n_qubits*n_params_per_layer]. 

1030 If None, model internal parameters are used. 

1031 inputs (Optional[np.ndarray]): Input vector of shape [1]. 

1032 If None, zeros are used. 

1033 enc_params (Optional[np.ndarray]): Weight vector of shape 

1034 [n_qubits, n_input_features]. If None, model internal encoding 

1035 parameters are used. 

1036 noise_params (Optional[Dict[str, float]], optional): The noise parameters. 

1037 Defaults to None which results in the last 

1038 set noise parameters being used. 

1039 cache (Optional[bool], optional): Whether to cache the results. 

1040 Defaults to False. 

1041 execution_type (str, optional): The type of execution. 

1042 Must be one of 'expval', 'density', or 'probs'. 

1043 Defaults to None which results in the last set execution type 

1044 being used. 

1045 force_mean (bool, optional): Whether to average 

1046 when performing n-local measurements. 

1047 Defaults to False. 

1048 

1049 Returns: 

1050 np.ndarray: The output of the quantum circuit. 

1051 The shape depends on the execution_type. 

1052 - If execution_type is 'expval', returns an ndarray of shape 

1053 (1,) if output_qubit is -1, else (len(output_qubit),). 

1054 - If execution_type is 'density', returns an ndarray 

1055 of shape (2**n_qubits, 2**n_qubits). 

1056 - If execution_type is 'probs', returns an ndarray 

1057 of shape (2**n_qubits,) if output_qubit is -1, else 

1058 (2**len(output_qubit),). 

1059 """ 

1060 # Call forward method which handles the actual caching etc. 

1061 return self._forward( 

1062 params=params, 

1063 inputs=inputs, 

1064 enc_params=enc_params, 

1065 noise_params=noise_params, 

1066 cache=cache, 

1067 execution_type=execution_type, 

1068 force_mean=force_mean, 

1069 ) 

1070 

1071 def _forward( 

1072 self, 

1073 params: Optional[np.ndarray] = None, 

1074 inputs: Optional[np.ndarray] = None, 

1075 enc_params: Optional[np.ndarray] = None, 

1076 noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None, 

1077 cache: Optional[bool] = False, 

1078 execution_type: Optional[str] = None, 

1079 force_mean: bool = False, 

1080 ) -> np.ndarray: 

1081 """ 

1082 Perform a forward pass of the quantum circuit. 

1083 

1084 Args: 

1085 params (Optional[np.ndarray]): Weight vector of shape 

1086 [n_layers, n_qubits*n_params_per_layer]. 

1087 If None, model internal parameters are used. 

1088 inputs (Optional[np.ndarray]): Input vector of shape [1]. 

1089 If None, zeros are used. 

1090 enc_params (Optional[np.ndarray]): Weight vector of shape 

1091 [n_qubits, n_input_features]. If None, model internal encoding 

1092 parameters are used. 

1093 noise_params (Optional[Dict[str, float]], optional): The noise parameters. 

1094 Defaults to None which results in the last 

1095 set noise parameters being used. 

1096 cache (Optional[bool], optional): Whether to cache the results. 

1097 Defaults to False. 

1098 execution_type (str, optional): The type of execution. 

1099 Must be one of 'expval', 'density', or 'probs'. 

1100 Defaults to None which results in the last set execution type 

1101 being used. 

1102 force_mean (bool, optional): Whether to average 

1103 when performing n-local measurements. 

1104 Defaults to False. 

1105 

1106 Returns: 

1107 np.ndarray: The output of the quantum circuit. 

1108 The shape depends on the execution_type. 

1109 - If execution_type is 'expval', returns an ndarray of shape 

1110 (1,) if output_qubit is -1, else (len(output_qubit),). 

1111 - If execution_type is 'density', returns an ndarray 

1112 of shape (2**n_qubits, 2**n_qubits). 

1113 - If execution_type is 'probs', returns an ndarray 

1114 of shape (2**n_qubits,) if output_qubit is -1, else 

1115 (2**len(output_qubit),). 

1116 

1117 Raises: 

1118 NotImplementedError: If the number of shots is not None or if the 

1119 expectation value is True. 

1120 """ 

1121 # set the parameters as object attributes 

1122 if noise_params is not None: 

1123 self.noise_params = noise_params 

1124 if execution_type is not None: 

1125 self.execution_type = execution_type 

1126 

1127 params = self._params_validation(params) 

1128 inputs = self._inputs_validation(inputs) 

1129 enc_params = self._enc_params_validation(enc_params) 

1130 inputs, params, self.batch_shape = self._assimilate_batch(inputs, params) 

1131 # the qasm representation contains the bound parameters, 

1132 # thus it is ok to hash that 

1133 hs = hashlib.md5( 

1134 repr( 

1135 { 

1136 "n_qubits": self.n_qubits, 

1137 "n_layers": self.n_layers, 

1138 "pqc": self.pqc.__class__.__name__, 

1139 "dru": self.data_reupload, 

1140 "params": self.params, # use safe-params 

1141 "enc_params": self.enc_params, 

1142 "noise_params": self.noise_params, 

1143 "execution_type": self.execution_type, 

1144 "inputs": inputs, 

1145 "output_qubit": self.output_qubit, 

1146 } 

1147 ).encode("utf-8") 

1148 ).hexdigest() 

1149 

1150 result: Optional[np.ndarray] = None 

1151 if cache: 

1152 name: str = f"pqc_{hs}.npy" 

1153 

1154 cache_folder: str = ".cache" 

1155 if not os.path.exists(cache_folder): 

1156 os.mkdir(cache_folder) 

1157 

1158 file_path: str = os.path.join(cache_folder, name) 

1159 

1160 if os.path.isfile(file_path): 

1161 result = np.load(file_path) 

1162 

1163 if result is None: 

1164 # if density matrix requested or noise params used 

1165 if self._requires_density(): 

1166 result = self._mp_executor( 

1167 f=self.circuit_mixed, 

1168 params=params, # use arraybox params 

1169 inputs=inputs, 

1170 enc_params=enc_params, 

1171 ) 

1172 else: 

1173 if not isinstance(self.circuit, qml.QNode): 

1174 result = self.circuit( 

1175 inputs=inputs, 

1176 ) 

1177 else: 

1178 result = self._mp_executor( 

1179 f=self.circuit, 

1180 params=params, # use arraybox params 

1181 inputs=inputs, 

1182 enc_params=enc_params, 

1183 ) 

1184 

1185 if isinstance(result, list): 

1186 result = np.stack(result) 

1187 

1188 if self.execution_type == "expval" and force_mean and self.output_qubit == -1: 

1189 # exception for torch layer because it swaps batch and output dimension 

1190 if not isinstance(self.circuit, qml.QNode): 

1191 result = result.mean(axis=-1) 

1192 else: 

1193 result = result.mean(axis=0) 

1194 elif self.execution_type == "probs" and force_mean and self.output_qubit == -1: 

1195 # exception for torch layer because it swaps batch and output dimension 

1196 if not isinstance(self.circuit, qml.QNode): 

1197 result = result[..., -1].sum(axis=-1) 

1198 else: 

1199 result = result[1:, ...].sum(axis=0) 

1200 

1201 if self.batch_shape[0] > 1 and self.batch_shape[1] > 1: 

1202 result = result.reshape(-1, *self.batch_shape) 

1203 

1204 result = result.squeeze() 

1205 

1206 if cache: 

1207 np.save(file_path, result) 

1208 

1209 return result 

1210 

1211 def get_specs(self, inputs: Optional[np.ndarray] = None) -> dict: 

1212 """ 

1213 Get pennylane specs for the model. 

1214 

1215 Args: 

1216 inputs (Optional[np.ndarray]): The inputs, with which to call the 

1217 circuit. Defaults to None. 

1218 

1219 Returns: 

1220 dict: Dictionary of specs. The key "resources" contains information 

1221 about the circuit size and gate statistics. 

1222 """ 

1223 inputs = self._inputs_validation(inputs) 

1224 spec_model = deepcopy(self) 

1225 spec_model.noise_params = None # remove noise 

1226 return qml.specs(spec_model.circuit)(self.params, inputs) 

1227 

1228 def get_circuit_depth(self, inputs: Optional[np.ndarray] = None) -> int: 

1229 """ 

1230 Obtain circuit depth for the model 

1231 

1232 Args: 

1233 inputs (Optional[np.ndarray]): The inputs, with which to call the 

1234 circuit. Defaults to None. 

1235 

1236 Returns: 

1237 int: Circuit depth (longest path of gates in circuit.) 

1238 """ 

1239 return self.get_specs(inputs)["resources"].depth