Coverage for qml_essentials/model.py: 56%

388 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-06-10 13:05 +0000

1from typing import Dict, Optional, Tuple, Callable, Union, List 

2import pennylane as qml 

3import pennylane.numpy as np 

4import hashlib 

5import os 

6import warnings 

7from autograd.numpy import numpy_boxes 

8from copy import deepcopy 

9import math 

10 

11from qml_essentials.ansaetze import Gates, Ansaetze, Circuit 

12from qml_essentials.utils import PauliCircuit, QuanTikz, MultiprocessingPool 

13 

14 

15import logging 

16 

17log = logging.getLogger(__name__) 

18 

19 

20class Model: 

21 """ 

22 A quantum circuit model. 

23 """ 

24 

25 lightning_threshold = 12 

26 

27 def __init__( 

28 self, 

29 n_qubits: int, 

30 n_layers: int, 

31 circuit_type: Union[str, Circuit] = "No_Ansatz", 

32 data_reupload: Union[bool, List[bool], List[List[bool]]] = True, 

33 state_preparation: Union[str, Callable, List[str], List[Callable]] = None, 

34 encoding: Union[str, Callable, List[str], List[Callable]] = Gates.RX, 

35 trainable_frequencies: bool = False, 

36 initialization: str = "random", 

37 initialization_domain: List[float] = [0, 2 * np.pi], 

38 output_qubit: Union[List[int], int] = -1, 

39 shots: Optional[int] = None, 

40 random_seed: int = 1000, 

41 as_pauli_circuit: bool = False, 

42 remove_zero_encoding: bool = True, 

43 mp_threshold: int = -1, 

44 ) -> None: 

45 """ 

46 Initialize the quantum circuit model. 

47 Parameters will have the shape [impl_n_layers, parameters_per_layer] 

48 where impl_n_layers is the number of layers provided and added by one 

49 depending if data_reupload is True and parameters_per_layer is given by 

50 the chosen ansatz. 

51 

52 The model is initialized with the following parameters as defaults: 

53 - noise_params: None 

54 - execution_type: "expval" 

55 - shots: None 

56 

57 Args: 

58 n_qubits (int): The number of qubits in the circuit. 

59 n_layers (int): The number of layers in the circuit. 

60 circuit_type (str, Circuit): The type of quantum circuit to use. 

61 If None, defaults to "no_ansatz". 

62 data_reupload (Union[bool, List[bool], List[List[bool]]], optional): 

63 Whether to reupload data to the quantum device on each 

64 layer and qubit. Detailed re-uploading instructions can be given 

65 as a list/array of 0/False and 1/True with shape (n_qubits, 

66 n_layers) to specify where to upload the data. Defaults to True 

67 for applying data re-uploading to the full circuit. 

68 encoding (Union[str, Callable, List[str], List[Callable]], optional): 

69 The unitary to use for encoding the input data. Can be a string 

70 (e.g. "RX") or a callable (e.g. qml.RX). Defaults to qml.RX. 

71 If input is multidimensional it is assumed to be a list of 

72 unitaries or a list of strings. 

73 trainable_frequencies (bool, optional): 

74 Sets trainable encoding parameters for trainable frequencies. 

75 Defaults to False. 

76 initialization (str, optional): The strategy to initialize the parameters. 

77 Can be "random", "zeros", "zero-controlled", "pi", or "pi-controlled". 

78 Defaults to "random". 

79 output_qubit (List[int], int, optional): The index of the output 

80 qubit (or qubits). When set to -1 all qubits are measured, or a 

81 global measurement is conducted, depending on the execution 

82 type. 

83 shots (Optional[int], optional): The number of shots to use for 

84 the quantum device. Defaults to None. 

85 random_seed (int, optional): seed for the random number generator 

86 in initialization is "random" and for random noise parameters. 

87 Defaults to 1000. 

88 as_pauli_circuit (bool, optional): whether the circuit is 

89 transformed to a Pauli-Clifford circuit as described by Nemkov 

90 et al. (https://doi.org/10.1103/PhysRevA.108.032406), which is 

91 required for analytical Fourier coefficient computation. 

92 Defaults to False. 

93 remove_zero_encoding (bool, optional): whether to 

94 remove the zero encoding from the circuit. Defaults to True. 

95 mp_threshold (int, optional): threshold above which the parameter 

96 batch dimension is split across multiple processes. 

97 Defaults to -1. 

98 

99 Returns: 

100 None 

101 """ 

102 # Initialize default parameters needed for circuit evaluation 

103 self.noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None 

104 self.execution_type: Optional[str] = "expval" 

105 self.shots = shots 

106 self.remove_zero_encoding = remove_zero_encoding 

107 self.mp_threshold = mp_threshold 

108 self.n_qubits: int = n_qubits 

109 self.n_layers: int = n_layers 

110 self.trainable_frequencies: bool = trainable_frequencies 

111 

112 if isinstance(output_qubit, list): 

113 assert ( 

114 len(output_qubit) <= n_qubits 

115 ), f"Size of output_qubit {len(output_qubit)} cannot be\ 

116 larger than number of qubits {n_qubits}." 

117 self.output_qubit: Union[List[int], int] = output_qubit 

118 

119 # Initialize rng in Gates 

120 Gates.init_rng(random_seed) 

121 

122 # --- State Preparation --- 

123 # first check if we have a str, list or callable 

124 if isinstance(state_preparation, str): 

125 # if str, use the pennylane fct 

126 self._sp = [getattr(Gates, f"{state_preparation}")] 

127 elif isinstance(state_preparation, list): 

128 # if list, check if str or callable 

129 if isinstance(state_preparation[0], str): 

130 self._sp = [getattr(Gates, f"{sp}") for sp in state_preparation] 

131 else: 

132 self._sp = state_preparation 

133 elif state_preparation is None: 

134 self._sp = [lambda *args, **kwargs: None] 

135 else: 

136 # default to callable 

137 self._sp = [state_preparation] 

138 

139 # --- Encoding --- 

140 # first check if we have a str, list or callable 

141 if isinstance(encoding, str): 

142 # if str, use the pennylane fct 

143 self._enc = [getattr(Gates, f"{encoding}")] 

144 elif isinstance(encoding, list): 

145 # if list, check if str or callable 

146 if isinstance(encoding[0], str): 

147 self._enc = [getattr(Gates, f"{enc}") for enc in encoding] 

148 else: 

149 self._enc = encoding 

150 else: 

151 # default to callable 

152 self._enc = [encoding] 

153 

154 # Number of possible inputs 

155 self.n_input_feat = len(self._enc) 

156 log.info(f"Number of input features: {self.n_input_feat}") 

157 

158 # Trainable frequencies, default initialization as in arXiv:2309.03279v2 

159 self.enc_params = np.ones( 

160 (self.n_qubits, self.n_input_feat), requires_grad=trainable_frequencies 

161 ) 

162 

163 # --- Data-Reuploading --- 

164 # Process data reuploading strategy and set degree 

165 if not isinstance(data_reupload, bool): 

166 if not isinstance(data_reupload, np.ndarray): 

167 data_reupload = np.array(data_reupload) 

168 if data_reupload.shape == ( 

169 n_layers, 

170 n_qubits, 

171 ): 

172 data_reupload = data_reupload.reshape(*data_reupload.shape, 1) 

173 data_reupload = np.repeat(data_reupload, self.n_input_feat, axis=2) 

174 

175 assert data_reupload.shape == ( 

176 n_layers, 

177 n_qubits, 

178 self.n_input_feat, 

179 ), f"Data reuploading array has wrong shape. \ 

180 Expected {(n_layers, n_qubits)} or\ 

181 {(n_layers, n_qubits, self.n_input_feat)},\ 

182 got {data_reupload.shape}." 

183 

184 log.debug(f"Data reuploading array:\n{data_reupload}") 

185 else: 

186 if data_reupload: 

187 impl_n_layers: int = ( 

188 n_layers + 1 

189 ) # we need L+1 according to Schuld et al. 

190 data_reupload = np.ones((n_layers, n_qubits, self.n_input_feat)) 

191 log.debug("Full data reuploading.") 

192 else: 

193 impl_n_layers: int = n_layers 

194 data_reupload = np.zeros((n_layers, n_qubits, self.n_input_feat)) 

195 data_reupload[0][0] = 1 

196 log.debug("No data reuploading.") 

197 

198 # convert to boolean values 

199 self.data_reupload = data_reupload.astype(bool) 

200 self.frequencies = [ 

201 np.count_nonzero(self.data_reupload[..., i]) 

202 for i in range(self.n_input_feat) 

203 ] 

204 

205 if self.degree > 1: 

206 impl_n_layers: int = n_layers + 1 # we need L+1 according to Schuld et al. 

207 else: 

208 impl_n_layers = n_layers 

209 log.info(f"Number of implicit layers: {impl_n_layers}.") 

210 

211 # --- Ansatz --- 

212 # only weak check for str. We trust the user to provide sth useful 

213 if isinstance(circuit_type, str): 

214 self.pqc: Callable[[Optional[np.ndarray], int], int] = getattr( 

215 Ansaetze, circuit_type or "No_Ansatz" 

216 )() 

217 else: 

218 self.pqc = circuit_type() 

219 log.info(f"Using Ansatz {circuit_type}.") 

220 

221 # calculate the shape of the parameter vector here, we will re-use this in init. 

222 params_per_layer = self.pqc.n_params_per_layer(self.n_qubits) 

223 self._params_shape: Tuple[int, int] = (impl_n_layers, params_per_layer) 

224 log.info(f"Parameters per layer: {params_per_layer}") 

225 

226 self.batch_shape = (1, 1) 

227 # this will also be re-used in the init method, 

228 # however, only if nothing is provided 

229 self._inialization_strategy = initialization 

230 self._initialization_domain = initialization_domain 

231 

232 # ..here! where we only require a rng 

233 self.initialize_params(np.random.default_rng(random_seed)) 

234 

235 # Initialize two circuits, one with the default device and 

236 # one with the mixed device 

237 # which allows us to later route depending on the state_vector flag 

238 self.as_pauli_circuit = as_pauli_circuit 

239 

240 self.circuit_mixed: qml.QNode = qml.QNode( 

241 self._circuit, 

242 qml.device("default.mixed", shots=self.shots, wires=self.n_qubits), 

243 ) 

244 

245 @property 

246 def degree(self): 

247 return max(self.frequencies) 

248 

249 @property 

250 def as_pauli_circuit(self) -> bool: 

251 return self._as_pauli_circuit 

252 

253 @as_pauli_circuit.setter 

254 def as_pauli_circuit(self, value: bool) -> None: 

255 self._as_pauli_circuit = value 

256 

257 if self.n_qubits < self.lightning_threshold: 

258 device = "default.qubit" 

259 else: 

260 device = "lightning.qubit" 

261 self.mp_threshold = -1 

262 

263 self.circuit: qml.QNode = qml.QNode( 

264 self._circuit, 

265 qml.device( 

266 device, 

267 shots=self.shots, 

268 wires=self.n_qubits, 

269 ), 

270 interface="autograd" if self.shots is not None else "auto", 

271 diff_method="parameter-shift" if self.shots is not None else "best", 

272 ) 

273 

274 if value: 

275 pauli_circuit_transform = qml.transform( 

276 PauliCircuit.from_parameterised_circuit 

277 ) 

278 self.circuit = pauli_circuit_transform(self.circuit) 

279 

280 @property 

281 def noise_params(self) -> Optional[Dict[str, Union[float, Dict[str, float]]]]: 

282 """ 

283 Gets the noise parameters of the model. 

284 

285 Returns: 

286 Optional[Dict[str, float]]: A dictionary of 

287 noise parameters or None if not set. 

288 """ 

289 return self._noise_params 

290 

291 @noise_params.setter 

292 def noise_params( 

293 self, kvs: Optional[Dict[str, Union[float, Dict[str, float]]]] 

294 ) -> None: 

295 """ 

296 Sets the noise parameters of the model. 

297 

298 Typically a "noise parameter" refers to the error probability. 

299 ThermalRelaxation is a special case, and supports a dict as value with 

300 structure: 

301 "ThermalRelaxation": 

302 { 

303 "t1": 2000, # relative t1 time. 

304 "t2": 1000, # relative t2 time 

305 "t_factor" 1: # relative gate time factor 

306 }, 

307 

308 Args: 

309 value (Optional[Dict[str, Union[float, Dict[str, float]]]]): A 

310 dictionary of noise parameters. If all values are 0.0, the noise 

311 parameters are set to None. 

312 

313 Returns: 

314 None 

315 """ 

316 # set to None if only zero values provided 

317 if kvs is not None and all(np == 0.0 for np in kvs.values()): 

318 kvs = None 

319 

320 # set default values 

321 if kvs is not None: 

322 kvs.setdefault("BitFlip", 0.0) 

323 kvs.setdefault("PhaseFlip", 0.0) 

324 kvs.setdefault("Depolarizing", 0.0) 

325 kvs.setdefault("AmplitudeDamping", 0.0) 

326 kvs.setdefault("PhaseDamping", 0.0) 

327 kvs.setdefault("GateError", 0.0) 

328 kvs.setdefault("ThermalRelaxation", None) 

329 kvs.setdefault("StatePreparation", 0.0) 

330 kvs.setdefault("Measurement", 0.0) 

331 

332 # check if there are any keys not supported 

333 for key in kvs.keys(): 

334 if key not in [ 

335 "BitFlip", 

336 "PhaseFlip", 

337 "Depolarizing", 

338 "AmplitudeDamping", 

339 "PhaseDamping", 

340 "GateError", 

341 "ThermalRelaxation", 

342 "StatePreparation", 

343 "Measurement", 

344 ]: 

345 warnings.warn( 

346 f"Noise type {key} is not supported by this package", 

347 UserWarning, 

348 ) 

349 

350 # check valid params for thermal relaxation noise channel 

351 tr_params = kvs["ThermalRelaxation"] 

352 if isinstance(tr_params, dict): 

353 tr_params.setdefault("t1", 0.0) 

354 tr_params.setdefault("t2", 0.0) 

355 tr_params.setdefault("t_factor", 0.0) 

356 for k in tr_params.keys(): 

357 if k not in [ 

358 "t1", 

359 "t2", 

360 "t_factor", 

361 ]: 

362 warnings.warn( 

363 f"Thermal Relaxation parameter {k} is not supported " 

364 f"by this package", 

365 UserWarning, 

366 ) 

367 if not all(tr_params.values()) or tr_params["t2"] > 2 * tr_params["t1"]: 

368 warnings.warn( 

369 "Received invalid values for Thermal Relaxation noise " 

370 "parameter. Thermal relaxation is not applied!", 

371 UserWarning, 

372 ) 

373 kvs["ThermalRelaxation"] = 0.0 

374 

375 self._noise_params = kvs 

376 

377 @property 

378 def execution_type(self) -> str: 

379 """ 

380 Gets the execution type of the model. 

381 

382 Returns: 

383 str: The execution type, one of 'density', 'expval', or 'probs'. 

384 """ 

385 return self._execution_type 

386 

387 @execution_type.setter 

388 def execution_type(self, value: str) -> None: 

389 if value not in ["density", "state", "expval", "probs"]: 

390 raise ValueError(f"Invalid execution type: {value}.") 

391 

392 if (value == "density" or value == "state") and self.output_qubit != -1: 

393 warnings.warn( 

394 f"{value} measurement does ignore output_qubit, which is " 

395 f"{self.output_qubit}.", 

396 UserWarning, 

397 ) 

398 

399 if value == "probs" and self.shots is None: 

400 warnings.warn( 

401 "Setting execution_type to probs without specifying shots.", 

402 UserWarning, 

403 ) 

404 

405 if value == "density" and self.shots is not None: 

406 warnings.warn( 

407 "Setting execution_type to density with specified shots.", 

408 UserWarning, 

409 ) 

410 

411 self._execution_type = value 

412 

413 @property 

414 def shots(self) -> Optional[int]: 

415 """ 

416 Gets the number of shots to use for the quantum device. 

417 

418 Returns: 

419 Optional[int]: The number of shots. 

420 """ 

421 return self._shots 

422 

423 @shots.setter 

424 def shots(self, value: Optional[int]) -> None: 

425 """ 

426 Sets the number of shots to use for the quantum device. 

427 

428 Args: 

429 value (Optional[int]): The number of shots. 

430 If an integer less than or equal to 0 is provided, it is set to None. 

431 

432 Returns: 

433 None 

434 """ 

435 if type(value) is int and value <= 0: 

436 value = None 

437 self._shots = value 

438 

439 def initialize_params( 

440 self, 

441 rng: np.random.Generator, 

442 repeat: int = None, 

443 initialization: str = None, 

444 initialization_domain: List[float] = None, 

445 ) -> None: 

446 """ 

447 Initializes the parameters of the model. 

448 

449 Args: 

450 rng: A random number generator to use for initialization. 

451 repeat: The number of times to repeat the parameters. 

452 If None, the number of layers is used. 

453 initialization: The strategy to use for parameter initialization. 

454 If None, the strategy specified in the constructor is used. 

455 initialization_domain: The domain to use for parameter initialization. 

456 If None, the domain specified in the constructor is used. 

457 

458 Returns: 

459 None 

460 """ 

461 params_shape = ( 

462 self._params_shape if repeat is None else [*self._params_shape, repeat] 

463 ) 

464 # use existing strategy if not specified 

465 initialization = initialization or self._inialization_strategy 

466 initialization_domain = initialization_domain or self._initialization_domain 

467 

468 def set_control_params(params: np.ndarray, value: float) -> np.ndarray: 

469 indices = self.pqc.get_control_indices(self.n_qubits) 

470 if indices is None: 

471 warnings.warn( 

472 f"Specified {initialization} but circuit\ 

473 does not contain controlled rotation gates.\ 

474 Parameters are intialized randomly.", 

475 UserWarning, 

476 ) 

477 else: 

478 params[:, indices[0] : indices[1] : indices[2]] = ( 

479 np.ones_like(params[:, indices[0] : indices[1] : indices[2]]) 

480 * value 

481 ) 

482 return params 

483 

484 if initialization == "random": 

485 self.params: np.ndarray = rng.uniform( 

486 *initialization_domain, params_shape, requires_grad=True 

487 ) 

488 elif initialization == "zeros": 

489 self.params: np.ndarray = np.zeros(params_shape, requires_grad=True) 

490 elif initialization == "pi": 

491 self.params: np.ndarray = np.ones(params_shape, requires_grad=True) * np.pi 

492 elif initialization == "zero-controlled": 

493 self.params: np.ndarray = rng.uniform( 

494 *initialization_domain, params_shape, requires_grad=True 

495 ) 

496 self.params = set_control_params(self.params, 0) 

497 elif initialization == "pi-controlled": 

498 self.params: np.ndarray = rng.uniform( 

499 *initialization_domain, params_shape, requires_grad=True 

500 ) 

501 self.params = set_control_params(self.params, np.pi) 

502 else: 

503 raise Exception("Invalid initialization method") 

504 

505 log.info( 

506 f"Initialized parameters with shape {self.params.shape}\ 

507 using strategy {initialization}." 

508 ) 

509 

510 def transform_input(self, inputs: np.ndarray, enc_params: Optional[np.ndarray]): 

511 """ 

512 Transforms the input as in arXiv:2309.03279v2 

513 

514 Args: 

515 inputs (np.ndarray): single input point of shape (1, n_input_feat) 

516 idx (int): feature index 

517 qubit (int): qubit on which to the encoding is being performed 

518 enc_params (np.ndarray): encoding weight vector of 

519 shape (n_qubits) 

520 

521 Returns: 

522 np.ndarray: transformed input of shape (1,), linearly scaled by 

523 enc_params, ready for encoding 

524 """ 

525 return inputs * enc_params 

526 

527 def _iec( 

528 self, 

529 inputs: np.ndarray, 

530 data_reupload: np.ndarray, 

531 enc: Union[Callable, List[Callable]], 

532 enc_params: np.ndarray, 

533 noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None, 

534 ) -> None: 

535 """ 

536 Creates an AngleEncoding using RX gates 

537 

538 Args: 

539 inputs (np.ndarray): single input point of shape (1, n_input_feat) 

540 data_reupload (np.ndarray): Boolean array to indicate positions in 

541 the circuit for data re-uploading for the IEC, shape is 

542 (n_qubits, n_layers). 

543 enc: Callable or List[Callable]: encoding function or list of encoding 

544 functions 

545 enc_params (np.ndarray): encoding weight vector 

546 of shape [n_qubits, n_inputs] 

547 noise_params (Optional[Dict[str, Union[float, Dict[str, float]]]]): 

548 The noise parameters. 

549 Returns: 

550 None 

551 """ 

552 # check for zero, because due to input validation, input cannot be none 

553 if self.remove_zero_encoding and not inputs.any(): 

554 return 

555 

556 for q in range(self.n_qubits): 

557 for idx in range(inputs.shape[1]): 

558 if data_reupload[q, idx]: 

559 enc[idx]( 

560 self.transform_input(inputs[:, idx], enc_params[q, idx]), 

561 wires=q, 

562 noise_params=noise_params, 

563 ) 

564 

565 def _circuit( 

566 self, 

567 params: np.ndarray, 

568 inputs: np.ndarray, 

569 enc_params: Optional[np.ndarray] = None, 

570 ) -> Union[float, np.ndarray]: 

571 """ 

572 Creates a circuit with noise. 

573 

574 Args: 

575 params (np.ndarray): weight vector of shape 

576 [n_layers, n_qubits*(n_params_per_layer+trainable_frequencies)] 

577 inputs (np.ndarray): input vector of size 1 

578 enc_params Optional[np.ndarray]: encoding weight vector 

579 of shape [n_qubits, n_inputs] 

580 Returns: 

581 Union[float, np.ndarray]: Expectation value of PauliZ(0) 

582 of the circuit if state_vector is False and expval is True, 

583 otherwise the density matrix of all qubits. 

584 """ 

585 

586 self._variational(params=params, inputs=inputs, enc_params=enc_params) 

587 return self._observable() 

588 

589 def _variational(self, params, inputs, enc_params=None): 

590 if enc_params is None: 

591 warnings.warn( 

592 "Explicit call to `_circuit` or `_variational` detected: " 

593 "`enc_params` is None, using `self.enc_params` instead.", 

594 RuntimeWarning, 

595 ) 

596 enc_params = self.enc_params 

597 

598 if self.noise_params is not None: 

599 self._apply_state_prep_noise() 

600 

601 for q in range(self.n_qubits): 

602 for _sp in self._sp: 

603 _sp(wires=q, noise_params=self.noise_params) 

604 

605 for layer in range(0, self.n_layers): 

606 self.pqc(params[layer], self.n_qubits, noise_params=self.noise_params) 

607 

608 self._iec( 

609 inputs, 

610 data_reupload=self.data_reupload[layer], 

611 enc=self._enc, 

612 enc_params=enc_params, 

613 noise_params=self.noise_params, 

614 ) 

615 

616 if self.degree > 1: 

617 qml.Barrier(wires=list(range(self.n_qubits)), only_visual=True) 

618 

619 if self.degree > 1: # same check as in init 

620 self.pqc(params[-1], self.n_qubits, noise_params=self.noise_params) 

621 

622 if self.noise_params is not None: 

623 self._apply_general_noise() 

624 

625 def _observable(self): 

626 # run mixed simualtion and get density matrix 

627 if self.execution_type == "density": 

628 return qml.density_matrix(wires=list(range(self.n_qubits))) 

629 elif self.execution_type == "state": 

630 return qml.state() 

631 # run default simulation and get expectation value 

632 elif self.execution_type == "expval": 

633 # n-local measurement 

634 if self.output_qubit == -1: 

635 return [qml.expval(qml.PauliZ(q)) for q in range(self.n_qubits)] 

636 # local measurement(s) 

637 elif isinstance(self.output_qubit, int): 

638 return qml.expval(qml.PauliZ(self.output_qubit)) 

639 # parity measurenment 

640 elif isinstance(self.output_qubit, list): 

641 obs = qml.PauliZ(self.output_qubit[0]) 

642 for out_qubit in self.output_qubit[1:]: 

643 obs = obs @ qml.PauliZ(out_qubit) 

644 return qml.expval(obs) 

645 else: 

646 raise ValueError( 

647 f"Invalid parameter `output_qubit`: {self.output_qubit}.\ 

648 Must be int, list or -1." 

649 ) 

650 # run default simulation and get probs 

651 elif self.execution_type == "probs": 

652 if self.output_qubit == -1: 

653 return qml.probs(wires=list(range(self.n_qubits))) 

654 else: 

655 return qml.probs(wires=self.output_qubit) 

656 else: 

657 raise ValueError(f"Invalid execution_type: {self.execution_type}.") 

658 

659 def _apply_state_prep_noise(self) -> None: 

660 """ 

661 Applies a state preparation error on each qubit according to the 

662 probability for StatePreparation provided in the noise_params. 

663 """ 

664 sp = self.noise_params.get("StatePreparation", 0.0) 

665 for q in range(self.n_qubits): 

666 if sp > 0: 

667 qml.BitFlip(sp, wires=q) 

668 

669 def _apply_general_noise(self) -> None: 

670 """ 

671 Applies general types of noise the full circuit (in contrast to gate 

672 errors, applied directly at gate level, see Gates.Noise). 

673 

674 Possible types of noise are: 

675 - AmplitudeDamping (specified through probability) 

676 - PhaseDamping (specified through probability) 

677 - ThermalRelaxation (specified through a dict, containing keys 

678 "t1", "t2", "t_factor") 

679 - Measurement (specified through probability) 

680 """ 

681 amp_damp = self.noise_params.get("AmplitudeDamping", 0.0) 

682 phase_damp = self.noise_params.get("PhaseDamping", 0.0) 

683 thermal_relax = self.noise_params.get("ThermalRelaxation", 0.0) 

684 meas = self.noise_params.get("Measurement", 0.0) 

685 for q in range(self.n_qubits): 

686 if amp_damp > 0: 

687 qml.AmplitudeDamping(amp_damp, wires=q) 

688 if phase_damp > 0: 

689 qml.PhaseDamping(phase_damp, wires=q) 

690 if meas > 0: 

691 qml.BitFlip(meas, wires=q) 

692 if isinstance(thermal_relax, dict): 

693 t1 = thermal_relax["t1"] 

694 t2 = thermal_relax["t2"] 

695 t_factor = thermal_relax["t_factor"] 

696 circuit_depth = self.get_circuit_depth() 

697 tg = circuit_depth * t_factor 

698 qml.ThermalRelaxationError(1.0, t1, t2, tg, q) 

699 

700 def draw(self, inputs=None, figure="text", *args, **kwargs): 

701 """ 

702 Draws the quantum circuit using the specified visualization method. 

703 

704 Args: 

705 inputs (Optional[np.ndarray]): Input vector for the circuit. If None, 

706 the default inputs are used. 

707 figure (str, optional): The type of figure to generate. Must be one of 

708 'text', 'mpl', or 'tikz'. Defaults to 'text'. 

709 Returns: 

710 Either a string, matplotlib figure or TikzFigure object (similar to string) 

711 depending on the chosen visualization. 

712 *args: 

713 Additional arguments to be passed to the visualization method. 

714 **kwargs: 

715 Additional keyword arguments to be passed to the visualization method. 

716 

717 Raises: 

718 AssertionError: If the 'figure' argument is not one of the accepted values. 

719 """ 

720 

721 if not isinstance(self.circuit, qml.QNode): 

722 # TODO: throws strange argument error if not catched 

723 return "" 

724 

725 assert figure in [ 

726 "text", 

727 "mpl", 

728 "tikz", 

729 ], f"Invalid figure: {figure}. Must be 'text', 'mpl' or 'tikz'." 

730 

731 inputs = self._inputs_validation(inputs) 

732 

733 if figure == "mpl": 

734 result = qml.draw_mpl(self.circuit)( 

735 params=self.params, 

736 inputs=inputs, 

737 enc_params=self.enc_params, 

738 *args, 

739 **kwargs, 

740 ) 

741 elif figure == "tikz": 

742 result = QuanTikz.build( 

743 self.circuit, 

744 params=self.params, 

745 inputs=inputs, 

746 enc_params=self.enc_params, 

747 *args, 

748 **kwargs, 

749 ) 

750 else: 

751 result = qml.draw(self.circuit)( 

752 params=self.params, inputs=inputs, enc_params=self.enc_params 

753 ) 

754 return result 

755 

756 def __repr__(self) -> str: 

757 return self.draw(figure="text") 

758 

759 def __str__(self) -> str: 

760 return self.draw(figure="text") 

761 

762 def _params_validation(self, params) -> np.ndarray: 

763 """ 

764 Sets the parameters when calling the quantum circuit 

765 

766 Args: 

767 params (np.ndarray): The parameters used for the call 

768 """ 

769 if params is None: 

770 params = self.params 

771 else: 

772 if numpy_boxes.ArrayBox == type(params): 

773 self.params = params._value 

774 else: 

775 self.params = params 

776 return params 

777 

778 def _enc_params_validation(self, enc_params) -> np.ndarray: 

779 """ 

780 Sets the encoding parameters when calling the quantum circuit 

781 

782 Args: 

783 enc_params (np.ndarray): The encoding parameters used for the call 

784 """ 

785 if enc_params is None: 

786 enc_params = self.enc_params 

787 else: 

788 if isinstance(enc_params, numpy_boxes.ArrayBox): 

789 if self.trainable_frequencies: 

790 self.enc_params = enc_params._value 

791 else: 

792 self.enc_params = np.array( 

793 enc_params._value, requires_grad=self.trainable_frequencies 

794 ) 

795 else: 

796 if self.trainable_frequencies: 

797 self.enc_params = enc_params 

798 else: 

799 self.enc_params = np.array( 

800 enc_params, requires_grad=self.trainable_frequencies 

801 ) 

802 

803 if len(enc_params.shape) == 1 and self.n_input_feat == 1: 

804 enc_params = enc_params.reshape(-1, 1) 

805 elif len(enc_params.shape) == 1 and self.n_input_feat > 1: 

806 raise ValueError( 

807 f"Input dimension {self.n_input_feat} >1 but \ 

808 `enc_params` has shape {enc_params.shape}" 

809 ) 

810 

811 return enc_params 

812 

813 def _inputs_validation( 

814 self, inputs: Union[None, List, float, int, np.ndarray] 

815 ) -> np.ndarray: 

816 """ 

817 Validate the inputs to be a 2D numpy array of shape (batch_size, n_inputs). 

818 

819 Args: 

820 inputs (Union[None, List, float, int, np.ndarray]): The input to validate. 

821 

822 Returns: 

823 np.ndarray: The validated input. 

824 """ 

825 if inputs is None: 

826 # initialize to zero 

827 inputs = np.array([[0] * self.n_input_feat]) 

828 elif isinstance(inputs, List): 

829 inputs = np.stack(inputs) 

830 elif isinstance(inputs, float) or isinstance(inputs, int): 

831 inputs = np.array([inputs]) 

832 

833 if len(inputs.shape) <= 1: 

834 if self.n_input_feat == 1: 

835 # add a batch dimension 

836 inputs = inputs.reshape(-1, 1) 

837 else: 

838 if inputs.shape[0] == self.n_input_feat: 

839 inputs = inputs.reshape(1, -1) 

840 else: 

841 inputs = inputs.reshape(-1, 1) 

842 inputs = inputs.repeat(self.n_input_feat, axis=1) 

843 warnings.warn( 

844 f"Expected {self.n_input_feat} inputs, but {inputs.shape[0]} " 

845 "was provided, replicating input for all input features.", 

846 UserWarning, 

847 ) 

848 else: 

849 if inputs.shape[1] != self.n_input_feat: 

850 raise ValueError( 

851 f"Wrong number of inputs provided. Expected {self.n_input_feat} " 

852 f"inputs, but input has shape {inputs.shape}." 

853 ) 

854 

855 return inputs 

856 

857 @staticmethod 

858 def _parallel_f( 

859 procnum, 

860 result, 

861 f, 

862 batch_size, 

863 params, 

864 inputs, 

865 batch_shape, 

866 enc_params, 

867 ): 

868 """ 

869 Helper function for parallelizing a function f over parameters. 

870 Sices the batch dimension based on the procnum and batch size. 

871 

872 Args: 

873 procnum: The process number. 

874 result: The result array. 

875 f: The function to be parallelized. 

876 batch_size: The batch size. 

877 params: The parameters array. 

878 inputs: The inputs array. 

879 enc_params: The encoding parameters array. 

880 """ 

881 min_idx = max(procnum * batch_size, 0) 

882 

883 if batch_shape[0] > 1: 

884 max_idx = min((procnum + 1) * batch_size, inputs.shape[0]) 

885 inputs = inputs[min_idx:max_idx] 

886 if batch_shape[1] > 1: 

887 max_idx = min((procnum + 1) * batch_size, params.shape[2]) 

888 params = params[:, :, min_idx:max_idx] 

889 

890 result[procnum] = f(params=params, inputs=inputs, enc_params=enc_params) 

891 

892 def _mp_executor(self, f, params, inputs, enc_params): 

893 """ 

894 Execute a function f in parallel over parameters. 

895 

896 Args: 

897 f: A function that takes two arguments, params and inputs, 

898 and returns a numpy array. 

899 params: A 3D numpy array of parameters where the first dimension is 

900 the layer index, the second dimension is the parameter index in 

901 the layer, and the third dimension is the sample index. 

902 inputs: A 2D numpy array of inputs where the first dimension is 

903 the sample index and the second dimension is the input feature index. 

904 enc_params: A 1D numpy array of encoding parameters where the dimension is 

905 the qubit index. 

906 

907 Returns: 

908 A numpy array of the output of f applied to each batch of 

909 samples in params, enc_params, and inputs. 

910 """ 

911 n_processes = 1 

912 # batches available? 

913 combined_batch_size = math.prod(self.batch_shape) 

914 if ( 

915 combined_batch_size > 1 

916 and self.mp_threshold > 0 

917 and combined_batch_size > self.mp_threshold 

918 ): 

919 n_processes = math.ceil(combined_batch_size / self.mp_threshold) 

920 # check if single process 

921 if n_processes == 1: 

922 result = f(params=params, inputs=inputs, enc_params=enc_params) 

923 else: 

924 log.info(f"Using {n_processes} processes") 

925 mpp = MultiprocessingPool( 

926 n_processes=n_processes, 

927 target=Model._parallel_f, 

928 batch_size=self.mp_threshold, 

929 f=f, 

930 params=params, 

931 enc_params=enc_params, 

932 inputs=inputs, 

933 batch_shape=self.batch_shape, 

934 ) 

935 return_dict = mpp.spawn() 

936 result = [None] * len(return_dict) 

937 for k, v in return_dict.items(): 

938 result[k] = v 

939 

940 result = np.concat(result, axis=1 if self.execution_type == "expval" else 0) 

941 return result 

942 

943 def _assimilate_batch(self, inputs, params): 

944 batch_shape = ( 

945 inputs.shape[0], 

946 params.shape[2] if len(params.shape) == 3 else 1, 

947 ) 

948 

949 if ( 

950 batch_shape[1] != 1 

951 and batch_shape[0] != batch_shape[1] 

952 and batch_shape[0] > 1 

953 ): 

954 # the following code does some dirty reshaping 

955 # TODO: optimize but be aware of the rabbit hole 

956 # key is to get the right "order" in which we repeat 

957 

958 # [BI,D] -> [BPxBI,D] 

959 inputs = np.repeat(inputs, batch_shape[1], axis=0) 

960 

961 # this is a tricky one, essentially we want to get 

962 # [L,Q,BP] -> [L,Q,BI,BP] -> [L,Q,BPxBI] 

963 params = np.repeat( 

964 params[:, :, np.newaxis, :], batch_shape[0], axis=2 

965 ).reshape([*params.shape[:-1], np.prod(batch_shape)]) 

966 

967 return inputs, params, batch_shape 

968 

969 def __call__( 

970 self, 

971 params: Optional[np.ndarray] = None, 

972 inputs: Optional[np.ndarray] = None, 

973 enc_params: Optional[np.ndarray] = None, 

974 noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None, 

975 cache: Optional[bool] = False, 

976 execution_type: Optional[str] = None, 

977 force_mean: bool = False, 

978 ) -> np.ndarray: 

979 """ 

980 Perform a forward pass of the quantum circuit. 

981 

982 Args: 

983 params (Optional[np.ndarray]): Weight vector of shape 

984 [n_layers, n_qubits*n_params_per_layer]. 

985 If None, model internal parameters are used. 

986 inputs (Optional[np.ndarray]): Input vector of shape [1]. 

987 If None, zeros are used. 

988 enc_params (Optional[np.ndarray]): Weight vector of shape 

989 [n_qubits, n_input_features]. If None, model internal encoding 

990 parameters are used. 

991 noise_params (Optional[Dict[str, float]], optional): The noise parameters. 

992 Defaults to None which results in the last 

993 set noise parameters being used. 

994 cache (Optional[bool], optional): Whether to cache the results. 

995 Defaults to False. 

996 execution_type (str, optional): The type of execution. 

997 Must be one of 'expval', 'density', or 'probs'. 

998 Defaults to None which results in the last set execution type 

999 being used. 

1000 force_mean (bool, optional): Whether to average 

1001 when performing n-local measurements. 

1002 Defaults to False. 

1003 

1004 Returns: 

1005 np.ndarray: The output of the quantum circuit. 

1006 The shape depends on the execution_type. 

1007 - If execution_type is 'expval', returns an ndarray of shape 

1008 (1,) if output_qubit is -1, else (len(output_qubit),). 

1009 - If execution_type is 'density', returns an ndarray 

1010 of shape (2**n_qubits, 2**n_qubits). 

1011 - If execution_type is 'probs', returns an ndarray 

1012 of shape (2**n_qubits,) if output_qubit is -1, else 

1013 (2**len(output_qubit),). 

1014 """ 

1015 # Call forward method which handles the actual caching etc. 

1016 return self._forward( 

1017 params=params, 

1018 inputs=inputs, 

1019 enc_params=enc_params, 

1020 noise_params=noise_params, 

1021 cache=cache, 

1022 execution_type=execution_type, 

1023 force_mean=force_mean, 

1024 ) 

1025 

1026 def _forward( 

1027 self, 

1028 params: Optional[np.ndarray] = None, 

1029 inputs: Optional[np.ndarray] = None, 

1030 enc_params: Optional[np.ndarray] = None, 

1031 noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None, 

1032 cache: Optional[bool] = False, 

1033 execution_type: Optional[str] = None, 

1034 force_mean: bool = False, 

1035 ) -> np.ndarray: 

1036 """ 

1037 Perform a forward pass of the quantum circuit. 

1038 

1039 Args: 

1040 params (Optional[np.ndarray]): Weight vector of shape 

1041 [n_layers, n_qubits*n_params_per_layer]. 

1042 If None, model internal parameters are used. 

1043 inputs (Optional[np.ndarray]): Input vector of shape [1]. 

1044 If None, zeros are used. 

1045 enc_params (Optional[np.ndarray]): Weight vector of shape 

1046 [n_qubits, n_input_features]. If None, model internal encoding 

1047 parameters are used. 

1048 noise_params (Optional[Dict[str, float]], optional): The noise parameters. 

1049 Defaults to None which results in the last 

1050 set noise parameters being used. 

1051 cache (Optional[bool], optional): Whether to cache the results. 

1052 Defaults to False. 

1053 execution_type (str, optional): The type of execution. 

1054 Must be one of 'expval', 'density', or 'probs'. 

1055 Defaults to None which results in the last set execution type 

1056 being used. 

1057 force_mean (bool, optional): Whether to average 

1058 when performing n-local measurements. 

1059 Defaults to False. 

1060 

1061 Returns: 

1062 np.ndarray: The output of the quantum circuit. 

1063 The shape depends on the execution_type. 

1064 - If execution_type is 'expval', returns an ndarray of shape 

1065 (1,) if output_qubit is -1, else (len(output_qubit),). 

1066 - If execution_type is 'density', returns an ndarray 

1067 of shape (2**n_qubits, 2**n_qubits). 

1068 - If execution_type is 'probs', returns an ndarray 

1069 of shape (2**n_qubits,) if output_qubit is -1, else 

1070 (2**len(output_qubit),). 

1071 

1072 Raises: 

1073 NotImplementedError: If the number of shots is not None or if the 

1074 expectation value is True. 

1075 """ 

1076 # set the parameters as object attributes 

1077 if noise_params is not None: 

1078 self.noise_params = noise_params 

1079 if execution_type is not None: 

1080 self.execution_type = execution_type 

1081 

1082 params = self._params_validation(params) 

1083 inputs = self._inputs_validation(inputs) 

1084 enc_params = self._enc_params_validation(enc_params) 

1085 inputs, params, self.batch_shape = self._assimilate_batch(inputs, params) 

1086 # the qasm representation contains the bound parameters, 

1087 # thus it is ok to hash that 

1088 hs = hashlib.md5( 

1089 repr( 

1090 { 

1091 "n_qubits": self.n_qubits, 

1092 "n_layers": self.n_layers, 

1093 "pqc": self.pqc.__class__.__name__, 

1094 "dru": self.data_reupload, 

1095 "params": self.params, # use safe-params 

1096 "enc_params": self.enc_params, 

1097 "noise_params": self.noise_params, 

1098 "execution_type": self.execution_type, 

1099 "inputs": inputs, 

1100 "output_qubit": self.output_qubit, 

1101 } 

1102 ).encode("utf-8") 

1103 ).hexdigest() 

1104 

1105 result: Optional[np.ndarray] = None 

1106 if cache: 

1107 name: str = f"pqc_{hs}.npy" 

1108 

1109 cache_folder: str = ".cache" 

1110 if not os.path.exists(cache_folder): 

1111 os.mkdir(cache_folder) 

1112 

1113 file_path: str = os.path.join(cache_folder, name) 

1114 

1115 if os.path.isfile(file_path): 

1116 result = np.load(file_path) 

1117 

1118 if result is None: 

1119 # if density matrix requested or noise params used 

1120 if self.execution_type == "density" or self.noise_params is not None: 

1121 result = self._mp_executor( 

1122 f=self.circuit_mixed, 

1123 params=params, # use arraybox params 

1124 inputs=inputs, 

1125 enc_params=enc_params, 

1126 ) 

1127 else: 

1128 if not isinstance(self.circuit, qml.QNode): 

1129 result = self.circuit( 

1130 inputs=inputs, 

1131 ) 

1132 else: 

1133 result = self._mp_executor( 

1134 f=self.circuit, 

1135 params=params, # use arraybox params 

1136 inputs=inputs, 

1137 enc_params=enc_params, 

1138 ) 

1139 

1140 if isinstance(result, list): 

1141 result = np.stack(result) 

1142 

1143 if self.execution_type == "expval" and force_mean and self.output_qubit == -1: 

1144 # exception for torch layer because it swaps batch and output dimension 

1145 if not isinstance(self.circuit, qml.QNode): 

1146 result = result.mean(axis=-1) 

1147 else: 

1148 result = result.mean(axis=0) 

1149 elif self.execution_type == "probs" and force_mean and self.output_qubit == -1: 

1150 # exception for torch layer because it swaps batch and output dimension 

1151 if not isinstance(self.circuit, qml.QNode): 

1152 result = result[..., -1].sum(axis=-1) 

1153 else: 

1154 result = result[1:, ...].sum(axis=0) 

1155 

1156 if self.batch_shape[0] > 1 and self.batch_shape[1] > 1: 

1157 result = result.reshape(-1, *self.batch_shape) 

1158 

1159 result = result.squeeze() 

1160 

1161 if cache: 

1162 np.save(file_path, result) 

1163 

1164 return result 

1165 

1166 def get_specs(self, inputs: Optional[np.ndarray] = None) -> dict: 

1167 """ 

1168 Get pennylane specs for the model. 

1169 

1170 Args: 

1171 inputs (Optional[np.ndarray]): The inputs, with which to call the 

1172 circuit. Defaults to None. 

1173 

1174 Returns: 

1175 dict: Dictionary of specs. The key "resources" contains information 

1176 about the circuit size and gate statistics. 

1177 """ 

1178 inputs = self._inputs_validation(inputs) 

1179 spec_model = deepcopy(self) 

1180 spec_model.noise_params = None # remove noise 

1181 return qml.specs(spec_model.circuit)(self.params, inputs) 

1182 

1183 def get_circuit_depth(self, inputs: Optional[np.ndarray] = None) -> int: 

1184 """ 

1185 Obtain circuit depth for the model 

1186 

1187 Args: 

1188 inputs (Optional[np.ndarray]): The inputs, with which to call the 

1189 circuit. Defaults to None. 

1190 

1191 Returns: 

1192 int: Circuit depth (longest path of gates in circuit.) 

1193 """ 

1194 return self.get_specs(inputs)["resources"].depth