Coverage for qml_essentials/model.py: 55%

359 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 15:48 +0000

1from typing import Dict, Optional, Tuple, Callable, Union, List 

2import pennylane as qml 

3import pennylane.numpy as np 

4import hashlib 

5import os 

6import warnings 

7from autograd.numpy import numpy_boxes 

8from copy import deepcopy 

9import math 

10 

11from qml_essentials.ansaetze import Gates, Ansaetze, Circuit 

12from qml_essentials.utils import PauliCircuit, QuanTikz, MultiprocessingPool 

13 

14 

15import logging 

16 

17log = logging.getLogger(__name__) 

18 

19 

20class Model: 

21 """ 

22 A quantum circuit model. 

23 """ 

24 

25 lightning_threshold = 12 

26 

27 def __init__( 

28 self, 

29 n_qubits: int, 

30 n_layers: int, 

31 circuit_type: Union[str, Circuit], 

32 data_reupload: Union[bool, List[int]] = True, 

33 state_preparation: Union[str, Callable, List[str], List[Callable]] = None, 

34 encoding: Union[str, Callable, List[str], List[Callable]] = Gates.RX, 

35 initialization: str = "random", 

36 initialization_domain: List[float] = [0, 2 * np.pi], 

37 output_qubit: Union[List[int], int] = -1, 

38 shots: Optional[int] = None, 

39 random_seed: int = 1000, 

40 as_pauli_circuit: bool = False, 

41 remove_zero_encoding: bool = True, 

42 mp_threshold: int = -1, 

43 ) -> None: 

44 """ 

45 Initialize the quantum circuit model. 

46 Parameters will have the shape [impl_n_layers, parameters_per_layer] 

47 where impl_n_layers is the number of layers provided and added by one 

48 depending if data_reupload is True and parameters_per_layer is given by 

49 the chosen ansatz. 

50 

51 The model is initialized with the following parameters as defaults: 

52 - noise_params: None 

53 - execution_type: "expval" 

54 - shots: None 

55 

56 Args: 

57 n_qubits (int): The number of qubits in the circuit. 

58 n_layers (int): The number of layers in the circuit. 

59 circuit_type (str, Circuit): The type of quantum circuit to use. 

60 If None, defaults to "no_ansatz". 

61 data_reupload (bool, optional): Whether to reupload data to the 

62 quantum device on each measurement. Defaults to True. 

63 encoding (Union[str, Callable, List[str], List[Callable]], optional): 

64 The unitary to use for encoding the input data. Can be a string 

65 (e.g. "RX") or a callable (e.g. qml.RX). Defaults to qml.RX. 

66 If input is multidimensional it is assumed to be a list of 

67 unitaries or a list of strings. 

68 initialization (str, optional): The strategy to initialize the parameters. 

69 Can be "random", "zeros", "zero-controlled", "pi", or "pi-controlled". 

70 Defaults to "random". 

71 output_qubit (List[int], int, optional): The index of the output 

72 qubit (or qubits). When set to -1 all qubits are measured, or a 

73 global measurement is conducted, depending on the execution 

74 type. 

75 shots (Optional[int], optional): The number of shots to use for 

76 the quantum device. Defaults to None. 

77 random_seed (int, optional): seed for the random number generator 

78 in initialization is "random" and for random noise parameters. 

79 Defaults to 1000. 

80 as_pauli_circuit (bool, optional): whether the circuit is 

81 transformed to a Pauli-Clifford circuit as described by Nemkov 

82 et al. (https://doi.org/10.1103/PhysRevA.108.032406), which is 

83 required for analytical Fourier coefficient computation. 

84 Defaults to False. 

85 remove_zero_encoding (bool, optional): whether to 

86 remove the zero encoding from the circuit. Defaults to True. 

87 mp_threshold (int, optional): threshold above which the parameter 

88 batch dimension is split across multiple processes. 

89 Defaults to -1. 

90 

91 Returns: 

92 None 

93 """ 

94 # Initialize default parameters needed for circuit evaluation 

95 self.noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None 

96 self.execution_type: Optional[str] = "expval" 

97 self.shots = shots 

98 self.remove_zero_encoding = remove_zero_encoding 

99 self.mp_threshold = mp_threshold 

100 

101 if isinstance(output_qubit, list): 

102 assert ( 

103 len(output_qubit) <= n_qubits 

104 ), f"Size of output_qubit {len(output_qubit)} cannot be\ 

105 larger than number of qubits {n_qubits}." 

106 self.output_qubit: Union[List[int], int] = output_qubit 

107 

108 # Copy the parameters 

109 self.n_qubits: int = n_qubits 

110 self.n_layers: int = n_layers 

111 

112 # Process data reuploading strategy and set degree 

113 if not isinstance(data_reupload, bool): 

114 if not isinstance(data_reupload, np.ndarray): 

115 data_reupload = np.array(data_reupload) 

116 assert data_reupload.shape == (n_layers, n_qubits) 

117 else: 

118 if data_reupload: 

119 impl_n_layers: int = ( 

120 n_layers + 1 

121 ) # we need L+1 according to Schuld et al. 

122 data_reupload = np.ones((n_layers, n_qubits)) 

123 else: 

124 impl_n_layers: int = n_layers 

125 data_reupload = np.zeros((n_layers, n_qubits)) 

126 data_reupload[0][0] = 1 

127 

128 self.degree = np.count_nonzero(data_reupload) 

129 self.data_reupload = data_reupload 

130 

131 if self.degree > 1: 

132 impl_n_layers: int = n_layers + 1 # we need L+1 according to Schuld et al. 

133 else: 

134 impl_n_layers = n_layers 

135 

136 # Initialize ansatz 

137 # only weak check for str. We trust the user to provide sth useful 

138 if isinstance(circuit_type, str): 

139 self.pqc: Callable[[Optional[np.ndarray], int], int] = getattr( 

140 Ansaetze, circuit_type or "No_Ansatz" 

141 )() 

142 else: 

143 self.pqc = circuit_type() 

144 

145 # Initialize rng in Gates 

146 Gates.init_rng(random_seed) 

147 

148 # Initialize state preparation 

149 # first check if we have a str, list or callable 

150 if isinstance(state_preparation, str): 

151 # if str, use the pennylane fct 

152 self._sp = [getattr(Gates, f"{state_preparation}")] 

153 elif isinstance(state_preparation, list): 

154 # if list, check if str or callable 

155 if isinstance(state_preparation[0], str): 

156 self._sp = [getattr(Gates, f"{sp}") for sp in state_preparation] 

157 else: 

158 self._sp = state_preparation 

159 elif state_preparation is None: 

160 self._sp = [lambda *args, **kwargs: None] 

161 else: 

162 # default to callable 

163 self._sp = [state_preparation] 

164 

165 # Initialize encoding 

166 # first check if we have a str, list or callable 

167 if isinstance(encoding, str): 

168 # if str, use the pennylane fct 

169 self._enc = getattr(Gates, f"{encoding}") 

170 elif isinstance(encoding, list): 

171 # if list, check if str or callable 

172 if isinstance(encoding[0], str): 

173 self._enc = [getattr(Gates, f"{enc}") for enc in encoding] 

174 else: 

175 self._enc = encoding 

176 

177 if len(self._enc) == 1: 

178 self._enc = self._enc[0] 

179 else: 

180 # default to callable 

181 self._enc = encoding 

182 

183 # Number of possible inputs 

184 self.n_input_feat = len(encoding) if isinstance(encoding, List) else 1 

185 

186 log.info(f"Using {circuit_type} circuit.") 

187 

188 log.info(f"Number of implicit layers set to {impl_n_layers}.") 

189 # calculate the shape of the parameter vector here, we will re-use this in init. 

190 self._params_shape: Tuple[int, int] = ( 

191 impl_n_layers, 

192 self.pqc.n_params_per_layer(self.n_qubits), 

193 ) 

194 self.batch_shape = (1, 1) 

195 # this will also be re-used in the init method, 

196 # however, only if nothing is provided 

197 self._inialization_strategy = initialization 

198 self._initialization_domain = initialization_domain 

199 

200 # ..here! where we only require a rng 

201 self.initialize_params(np.random.default_rng(random_seed)) 

202 

203 # Initialize two circuits, one with the default device and 

204 # one with the mixed device 

205 # which allows us to later route depending on the state_vector flag 

206 self.as_pauli_circuit = as_pauli_circuit 

207 

208 self.circuit_mixed: qml.QNode = qml.QNode( 

209 self._circuit, 

210 qml.device("default.mixed", shots=self.shots, wires=self.n_qubits), 

211 ) 

212 

213 @property 

214 def as_pauli_circuit(self) -> bool: 

215 return self._as_pauli_circuit 

216 

217 @as_pauli_circuit.setter 

218 def as_pauli_circuit(self, value: bool) -> None: 

219 self._as_pauli_circuit = value 

220 

221 if self.n_qubits < self.lightning_threshold: 

222 device = "default.qubit" 

223 else: 

224 device = "lightning.qubit" 

225 self.mp_threshold = -1 

226 

227 self.circuit: qml.QNode = qml.QNode( 

228 self._circuit, 

229 qml.device( 

230 device, 

231 shots=self.shots, 

232 wires=self.n_qubits, 

233 ), 

234 interface="autograd" if self.shots is not None else "auto", 

235 diff_method="parameter-shift" if self.shots is not None else "best", 

236 ) 

237 

238 if value: 

239 pauli_circuit_transform = qml.transform( 

240 PauliCircuit.from_parameterised_circuit 

241 ) 

242 self.circuit = pauli_circuit_transform(self.circuit) 

243 

244 @property 

245 def noise_params(self) -> Optional[Dict[str, Union[float, Dict[str, float]]]]: 

246 """ 

247 Gets the noise parameters of the model. 

248 

249 Returns: 

250 Optional[Dict[str, float]]: A dictionary of 

251 noise parameters or None if not set. 

252 """ 

253 return self._noise_params 

254 

255 @noise_params.setter 

256 def noise_params( 

257 self, kvs: Optional[Dict[str, Union[float, Dict[str, float]]]] 

258 ) -> None: 

259 """ 

260 Sets the noise parameters of the model. 

261 

262 Typically a "noise parameter" refers to the error probability. 

263 ThermalRelaxation is a special case, and supports a dict as value with 

264 structure: 

265 "ThermalRelaxation": 

266 { 

267 "t1": 2000, # relative t1 time. 

268 "t2": 1000, # relative t2 time 

269 "t_factor" 1: # relative gate time factor 

270 }, 

271 

272 Args: 

273 value (Optional[Dict[str, Union[float, Dict[str, float]]]]): A 

274 dictionary of noise parameters. If all values are 0.0, the noise 

275 parameters are set to None. 

276 

277 Returns: 

278 None 

279 """ 

280 # set to None if only zero values provided 

281 if kvs is not None and all(np == 0.0 for np in kvs.values()): 

282 kvs = None 

283 

284 # set default values 

285 if kvs is not None: 

286 kvs.setdefault("BitFlip", 0.0) 

287 kvs.setdefault("PhaseFlip", 0.0) 

288 kvs.setdefault("Depolarizing", 0.0) 

289 kvs.setdefault("AmplitudeDamping", 0.0) 

290 kvs.setdefault("PhaseDamping", 0.0) 

291 kvs.setdefault("GateError", 0.0) 

292 kvs.setdefault("ThermalRelaxation", None) 

293 kvs.setdefault("StatePreparation", 0.0) 

294 kvs.setdefault("Measurement", 0.0) 

295 

296 # check if there are any keys not supported 

297 for key in kvs.keys(): 

298 if key not in [ 

299 "BitFlip", 

300 "PhaseFlip", 

301 "Depolarizing", 

302 "AmplitudeDamping", 

303 "PhaseDamping", 

304 "GateError", 

305 "ThermalRelaxation", 

306 "StatePreparation", 

307 "Measurement", 

308 ]: 

309 warnings.warn( 

310 f"Noise type {key} is not supported by this package", 

311 UserWarning, 

312 ) 

313 

314 # check valid params for thermal relaxation noise channel 

315 tr_params = kvs["ThermalRelaxation"] 

316 if isinstance(tr_params, dict): 

317 tr_params.setdefault("t1", 0.0) 

318 tr_params.setdefault("t2", 0.0) 

319 tr_params.setdefault("t_factor", 0.0) 

320 for k in tr_params.keys(): 

321 if k not in [ 

322 "t1", 

323 "t2", 

324 "t_factor", 

325 ]: 

326 warnings.warn( 

327 f"Thermal Relaxation parameter {k} is not supported " 

328 f"by this package", 

329 UserWarning, 

330 ) 

331 if not all(tr_params.values()) or tr_params["t2"] > 2 * tr_params["t1"]: 

332 warnings.warn( 

333 "Received invalid values for Thermal Relaxation noise " 

334 "parameter. Thermal relaxation is not applied!", 

335 UserWarning, 

336 ) 

337 kvs["ThermalRelaxation"] = 0.0 

338 

339 self._noise_params = kvs 

340 

341 @property 

342 def execution_type(self) -> str: 

343 """ 

344 Gets the execution type of the model. 

345 

346 Returns: 

347 str: The execution type, one of 'density', 'expval', or 'probs'. 

348 """ 

349 return self._execution_type 

350 

351 @execution_type.setter 

352 def execution_type(self, value: str) -> None: 

353 if value not in ["density", "state", "expval", "probs"]: 

354 raise ValueError(f"Invalid execution type: {value}.") 

355 

356 if (value == "density" or value == "state") and self.output_qubit != -1: 

357 warnings.warn( 

358 f"{value} measurement does ignore output_qubit, which is " 

359 f"{self.output_qubit}.", 

360 UserWarning, 

361 ) 

362 

363 if value == "probs" and self.shots is None: 

364 warnings.warn( 

365 "Setting execution_type to probs without specifying shots.", 

366 UserWarning, 

367 ) 

368 

369 if value == "density" and self.shots is not None: 

370 warnings.warn( 

371 "Setting execution_type to density with specified shots.", 

372 UserWarning, 

373 ) 

374 

375 self._execution_type = value 

376 

377 @property 

378 def shots(self) -> Optional[int]: 

379 """ 

380 Gets the number of shots to use for the quantum device. 

381 

382 Returns: 

383 Optional[int]: The number of shots. 

384 """ 

385 return self._shots 

386 

387 @shots.setter 

388 def shots(self, value: Optional[int]) -> None: 

389 """ 

390 Sets the number of shots to use for the quantum device. 

391 

392 Args: 

393 value (Optional[int]): The number of shots. 

394 If an integer less than or equal to 0 is provided, it is set to None. 

395 

396 Returns: 

397 None 

398 """ 

399 if type(value) is int and value <= 0: 

400 value = None 

401 self._shots = value 

402 

403 def initialize_params( 

404 self, 

405 rng: np.random.Generator, 

406 repeat: int = None, 

407 initialization: str = None, 

408 initialization_domain: List[float] = None, 

409 ) -> None: 

410 """ 

411 Initializes the parameters of the model. 

412 

413 Args: 

414 rng: A random number generator to use for initialization. 

415 repeat: The number of times to repeat the parameters. 

416 If None, the number of layers is used. 

417 initialization: The strategy to use for parameter initialization. 

418 If None, the strategy specified in the constructor is used. 

419 initialization_domain: The domain to use for parameter initialization. 

420 If None, the domain specified in the constructor is used. 

421 

422 Returns: 

423 None 

424 """ 

425 params_shape = ( 

426 self._params_shape if repeat is None else [*self._params_shape, repeat] 

427 ) 

428 # use existing strategy if not specified 

429 initialization = initialization or self._inialization_strategy 

430 initialization_domain = initialization_domain or self._initialization_domain 

431 

432 def set_control_params(params: np.ndarray, value: float) -> np.ndarray: 

433 indices = self.pqc.get_control_indices(self.n_qubits) 

434 if indices is None: 

435 warnings.warn( 

436 f"Specified {initialization} but circuit\ 

437 does not contain controlled rotation gates.\ 

438 Parameters are intialized randomly.", 

439 UserWarning, 

440 ) 

441 else: 

442 params[:, indices[0] : indices[1] : indices[2]] = ( 

443 np.ones_like(params[:, indices[0] : indices[1] : indices[2]]) 

444 * value 

445 ) 

446 return params 

447 

448 if initialization == "random": 

449 self.params: np.ndarray = rng.uniform( 

450 *initialization_domain, params_shape, requires_grad=True 

451 ) 

452 elif initialization == "zeros": 

453 self.params: np.ndarray = np.zeros(params_shape, requires_grad=True) 

454 elif initialization == "pi": 

455 self.params: np.ndarray = np.ones(params_shape, requires_grad=True) * np.pi 

456 elif initialization == "zero-controlled": 

457 self.params: np.ndarray = rng.uniform( 

458 *initialization_domain, params_shape, requires_grad=True 

459 ) 

460 self.params = set_control_params(self.params, 0) 

461 elif initialization == "pi-controlled": 

462 self.params: np.ndarray = rng.uniform( 

463 *initialization_domain, params_shape, requires_grad=True 

464 ) 

465 self.params = set_control_params(self.params, np.pi) 

466 else: 

467 raise Exception("Invalid initialization method") 

468 

469 log.info( 

470 f"Initialized parameters with shape {self.params.shape}\ 

471 using strategy {initialization}." 

472 ) 

473 

474 def _iec( 

475 self, 

476 inputs: np.ndarray, 

477 data_reupload: bool, 

478 enc: Union[Callable, List[Callable]], 

479 noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None, 

480 ) -> None: 

481 """ 

482 Creates an AngleEncoding using RX gates 

483 

484 Args: 

485 inputs (np.ndarray): length of vector must be 1, shape (1,) 

486 data_reupload (bool, optional): Whether to reupload the data 

487 for the IEC or not, default is True. 

488 

489 Returns: 

490 None 

491 """ 

492 # check for zero, because due to input validation, input cannot be none 

493 if self.remove_zero_encoding and not inputs.any(): 

494 return 

495 

496 # one dimensional encoding 

497 if inputs.shape[1] == 1: 

498 for q in range(self.n_qubits): 

499 if data_reupload[q]: 

500 enc(inputs[:, 0], wires=q, noise_params=noise_params) 

501 # multi dimensional encoding 

502 else: 

503 for q in range(self.n_qubits): 

504 if data_reupload[q]: 

505 for idx in range(inputs.shape[1]): 

506 enc[idx](inputs[:, idx], wires=q, noise_params=noise_params) 

507 

508 def _circuit( 

509 self, 

510 params: np.ndarray, 

511 inputs: np.ndarray, 

512 ) -> Union[float, np.ndarray]: 

513 """ 

514 Creates a circuit with noise. 

515 

516 Args: 

517 params (np.ndarray): weight vector of shape 

518 [n_layers, n_qubits*n_params_per_layer] 

519 inputs (np.ndarray): input vector of size 1 

520 Returns: 

521 Union[float, np.ndarray]: Expectation value of PauliZ(0) 

522 of the circuit if state_vector is False and expval is True, 

523 otherwise the density matrix of all qubits. 

524 """ 

525 self._variational(params=params, inputs=inputs) 

526 return self._observable() 

527 

528 def _variational(self, params, inputs): 

529 if self.noise_params is not None: 

530 self._apply_state_prep_noise() 

531 

532 for q in range(self.n_qubits): 

533 for _sp in self._sp: 

534 _sp(wires=q, noise_params=self.noise_params) 

535 

536 for layer in range(0, self.n_layers): 

537 self.pqc(params[layer], self.n_qubits, noise_params=self.noise_params) 

538 

539 self._iec( 

540 inputs, 

541 data_reupload=self.data_reupload[layer], 

542 enc=self._enc, 

543 noise_params=self.noise_params, 

544 ) 

545 

546 if self.degree > 1: 

547 qml.Barrier(wires=list(range(self.n_qubits)), only_visual=True) 

548 

549 if self.degree > 1: # same check as in init 

550 self.pqc(params[-1], self.n_qubits, noise_params=self.noise_params) 

551 

552 if self.noise_params is not None: 

553 self._apply_general_noise() 

554 

555 def _observable(self): 

556 # run mixed simualtion and get density matrix 

557 if self.execution_type == "density": 

558 return qml.density_matrix(wires=list(range(self.n_qubits))) 

559 elif self.execution_type == "state": 

560 return qml.state() 

561 # run default simulation and get expectation value 

562 elif self.execution_type == "expval": 

563 # n-local measurement 

564 if self.output_qubit == -1: 

565 return [qml.expval(qml.PauliZ(q)) for q in range(self.n_qubits)] 

566 # local measurement(s) 

567 elif isinstance(self.output_qubit, int): 

568 return qml.expval(qml.PauliZ(self.output_qubit)) 

569 # parity measurenment 

570 elif isinstance(self.output_qubit, list): 

571 obs = qml.PauliZ(self.output_qubit[0]) 

572 for out_qubit in self.output_qubit[1:]: 

573 obs = obs @ qml.PauliZ(out_qubit) 

574 return qml.expval(obs) 

575 else: 

576 raise ValueError( 

577 f"Invalid parameter 'output_qubit': {self.output_qubit}.\ 

578 Must be int, list or -1." 

579 ) 

580 # run default simulation and get probs 

581 elif self.execution_type == "probs": 

582 if self.output_qubit == -1: 

583 return qml.probs(wires=list(range(self.n_qubits))) 

584 else: 

585 return qml.probs(wires=self.output_qubit) 

586 else: 

587 raise ValueError(f"Invalid execution_type: {self.execution_type}.") 

588 

589 def _apply_state_prep_noise(self) -> None: 

590 """ 

591 Applies a state preparation error on each qubit according to the 

592 probability for StatePreparation provided in the noise_params. 

593 """ 

594 sp = self.noise_params.get("StatePreparation", 0.0) 

595 for q in range(self.n_qubits): 

596 if sp > 0: 

597 qml.BitFlip(sp, wires=q) 

598 

599 def _apply_general_noise(self) -> None: 

600 """ 

601 Applies general types of noise the full circuit (in contrast to gate 

602 errors, applied directly at gate level, see Gates.Noise). 

603 

604 Possible types of noise are: 

605 - AmplitudeDamping (specified through probability) 

606 - PhaseDamping (specified through probability) 

607 - ThermalRelaxation (specified through a dict, containing keys 

608 "t1", "t2", "t_factor") 

609 - Measurement (specified through probability) 

610 """ 

611 amp_damp = self.noise_params.get("AmplitudeDamping", 0.0) 

612 phase_damp = self.noise_params.get("PhaseDamping", 0.0) 

613 thermal_relax = self.noise_params.get("ThermalRelaxation", 0.0) 

614 meas = self.noise_params.get("Measurement", 0.0) 

615 for q in range(self.n_qubits): 

616 if amp_damp > 0: 

617 qml.AmplitudeDamping(amp_damp, wires=q) 

618 if phase_damp > 0: 

619 qml.PhaseDamping(phase_damp, wires=q) 

620 if meas > 0: 

621 qml.BitFlip(meas, wires=q) 

622 if isinstance(thermal_relax, dict): 

623 t1 = thermal_relax["t1"] 

624 t2 = thermal_relax["t2"] 

625 t_factor = thermal_relax["t_factor"] 

626 circuit_depth = self.get_circuit_depth() 

627 tg = circuit_depth * t_factor 

628 qml.ThermalRelaxationError(1.0, t1, t2, tg, q) 

629 

630 def draw(self, inputs=None, figure="text", *args, **kwargs): 

631 """ 

632 Draws the quantum circuit using the specified visualization method. 

633 

634 Args: 

635 inputs (Optional[np.ndarray]): Input vector for the circuit. If None, 

636 the default inputs are used. 

637 figure (str, optional): The type of figure to generate. Must be one of 

638 'text', 'mpl', or 'tikz'. Defaults to 'text'. 

639 Returns: 

640 Either a string, matplotlib figure or TikzFigure object (similar to string) 

641 depending on the chosen visualization. 

642 *args: 

643 Additional arguments to be passed to the visualization method. 

644 **kwargs: 

645 Additional keyword arguments to be passed to the visualization method. 

646 

647 Raises: 

648 AssertionError: If the 'figure' argument is not one of the accepted values. 

649 """ 

650 

651 if not isinstance(self.circuit, qml.QNode): 

652 # TODO: throws strange argument error if not catched 

653 return "" 

654 

655 assert figure in [ 

656 "text", 

657 "mpl", 

658 "tikz", 

659 ], f"Invalid figure: {figure}. Must be 'text', 'mpl' or 'tikz'." 

660 

661 inputs = self._inputs_validation(inputs) 

662 

663 if figure == "mpl": 

664 result = qml.draw_mpl(self.circuit)( 

665 params=self.params, inputs=inputs, *args, **kwargs 

666 ) 

667 elif figure == "tikz": 

668 result = QuanTikz.build( 

669 self.circuit, params=self.params, inputs=inputs, *args, **kwargs 

670 ) 

671 else: 

672 result = qml.draw(self.circuit)(params=self.params, inputs=inputs) 

673 return result 

674 

675 def __repr__(self) -> str: 

676 return self.draw(figure="text") 

677 

678 def __str__(self) -> str: 

679 return self.draw(figure="text") 

680 

681 def _params_validation(self, params) -> np.ndarray: 

682 """ 

683 Sets the parameters when calling the quantum circuit 

684 

685 Args: 

686 params (np.ndarray): The parameters used for the call 

687 """ 

688 if params is None: 

689 params = self.params 

690 else: 

691 if numpy_boxes.ArrayBox == type(params): 

692 self.params = params._value 

693 else: 

694 self.params = params 

695 return params 

696 

697 def _inputs_validation( 

698 self, inputs: Union[None, List, float, int, np.ndarray] 

699 ) -> np.ndarray: 

700 """ 

701 Validate the inputs to be a 2D numpy array of shape (batch_size, n_inputs). 

702 

703 Args: 

704 inputs (Union[None, List, float, int, np.ndarray]): The input to validate. 

705 

706 Returns: 

707 np.ndarray: The validated input. 

708 """ 

709 if inputs is None: 

710 # initialize to zero 

711 inputs = np.array([[0] * self.n_input_feat]) 

712 elif isinstance(inputs, List): 

713 inputs = np.stack(inputs) 

714 elif isinstance(inputs, float) or isinstance(inputs, int): 

715 inputs = np.array([inputs]) 

716 

717 if len(inputs.shape) <= 1: 

718 if self.n_input_feat == 1: 

719 # add a batch dimension 

720 inputs = inputs.reshape(-1, 1) 

721 else: 

722 if inputs.shape[0] == self.n_input_feat: 

723 inputs = inputs.reshape(1, -1) 

724 else: 

725 inputs = inputs.reshape(-1, 1) 

726 inputs = inputs.repeat(self.n_input_feat, axis=1) 

727 warnings.warn( 

728 f"Expected {self.n_input_feat} inputs, but {inputs.shape[0]} " 

729 "was provided, replicating input for all input features.", 

730 UserWarning, 

731 ) 

732 else: 

733 if inputs.shape[1] != self.n_input_feat: 

734 raise ValueError( 

735 f"Wrong number of inputs provided. Expected {self.n_input_feat} " 

736 f"inputs, but input has shape {inputs.shape}." 

737 ) 

738 

739 return inputs 

740 

741 @staticmethod 

742 def _parallel_f(procnum, result, f, batch_size, params, inputs, batch_shape): 

743 """ 

744 Helper function for parallelizing a function f over parameters. 

745 Sices the batch dimension based on the procnum and batch size. 

746 

747 Args: 

748 procnum: The process number. 

749 result: The result array. 

750 f: The function to be parallelized. 

751 batch_size: The batch size. 

752 params: The parameters array. 

753 inputs: The inputs array. 

754 """ 

755 min_idx = max(procnum * batch_size, 0) 

756 

757 if batch_shape[0] > 1: 

758 max_idx = min((procnum + 1) * batch_size, inputs.shape[0]) 

759 inputs = inputs[min_idx:max_idx] 

760 if batch_shape[1] > 1: 

761 max_idx = min((procnum + 1) * batch_size, params.shape[2]) 

762 params = params[:, :, min_idx:max_idx] 

763 

764 result[procnum] = f(params=params, inputs=inputs) 

765 

766 def _mp_executor(self, f, params, inputs): 

767 """ 

768 Execute a function f in parallel over parameters. 

769 

770 Args: 

771 f: A function that takes two arguments, params and inputs, 

772 and returns a numpy array. 

773 params: A 3D numpy array of parameters where the first dimension is 

774 the layer index, the second dimension is the parameter index in 

775 the layer, and the third dimension is the sample index. 

776 inputs: A 2D numpy array of inputs where the first dimension is 

777 the sample index and the second dimension is the input feature index. 

778 

779 Returns: 

780 A numpy array of the output of f applied to each batch of 

781 samples in params and inputs. 

782 """ 

783 n_processes = 1 

784 # batches available? 

785 if params is not None and len(params.shape) > 2: 

786 # sufficiently large for MP? 

787 if self.mp_threshold > 0 and params.shape[2] > self.mp_threshold: 

788 n_processes = math.ceil(params.shape[2] / self.mp_threshold) 

789 

790 # check if single process 

791 if n_processes == 1: 

792 result = f(params=params, inputs=inputs) 

793 else: 

794 log.info(f"Using {n_processes} processes") 

795 mpp = MultiprocessingPool( 

796 n_processes=n_processes, 

797 target=Model._parallel_f, 

798 batch_size=math.ceil(params.shape[2] / n_processes), 

799 f=f, 

800 params=params, 

801 inputs=inputs, 

802 batch_shape=self.batch_shape, 

803 ) 

804 return_dict = mpp.spawn() 

805 result = [None] * len(return_dict) 

806 for k, v in return_dict.items(): 

807 result[k] = v 

808 

809 result = np.concat(result, axis=1 if self.execution_type == "expval" else 0) 

810 

811 return result 

812 

813 def _assimilate_batch(self, inputs, params): 

814 batch_shape = ( 

815 inputs.shape[0], 

816 params.shape[2] if len(params.shape) == 3 else 1, 

817 ) 

818 

819 if ( 

820 batch_shape[1] != 1 

821 and batch_shape[0] != batch_shape[1] 

822 and batch_shape[0] > 1 

823 ): 

824 # the following code does some dirty reshaping 

825 # TODO: optimize but be aware of the rabbit hole 

826 # key is to get the right "order" in which we repeat 

827 

828 # [BI,D] -> [BPxBI,D] 

829 inputs = np.repeat(inputs, batch_shape[1], axis=0) 

830 

831 # this is a tricky one, essentially we want to get 

832 # [L,Q,BP] -> [L,Q,BI,BP] -> [L,Q,BPxBI] 

833 params = np.repeat( 

834 params[:, :, np.newaxis, :], batch_shape[0], axis=2 

835 ).reshape([*params.shape[:-1], np.prod(batch_shape)]) 

836 

837 return inputs, params, batch_shape 

838 

839 def __call__( 

840 self, 

841 params: Optional[np.ndarray] = None, 

842 inputs: Optional[np.ndarray] = None, 

843 noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None, 

844 cache: Optional[bool] = False, 

845 execution_type: Optional[str] = None, 

846 force_mean: bool = False, 

847 ) -> np.ndarray: 

848 """ 

849 Perform a forward pass of the quantum circuit. 

850 

851 Args: 

852 params (Optional[np.ndarray]): Weight vector of shape 

853 [n_layers, n_qubits*n_params_per_layer]. 

854 If None, model internal parameters are used. 

855 inputs (Optional[np.ndarray]): Input vector of shape [1]. 

856 If None, zeros are used. 

857 noise_params (Optional[Dict[str, float]], optional): The noise parameters. 

858 Defaults to None which results in the last 

859 set noise parameters being used. 

860 cache (Optional[bool], optional): Whether to cache the results. 

861 Defaults to False. 

862 execution_type (str, optional): The type of execution. 

863 Must be one of 'expval', 'density', or 'probs'. 

864 Defaults to None which results in the last set execution type 

865 being used. 

866 force_mean (bool, optional): Whether to average 

867 when performing n-local measurements. 

868 Defaults to False. 

869 

870 Returns: 

871 np.ndarray: The output of the quantum circuit. 

872 The shape depends on the execution_type. 

873 - If execution_type is 'expval', returns an ndarray of shape 

874 (1,) if output_qubit is -1, else (len(output_qubit),). 

875 - If execution_type is 'density', returns an ndarray 

876 of shape (2**n_qubits, 2**n_qubits). 

877 - If execution_type is 'probs', returns an ndarray 

878 of shape (2**n_qubits,) if output_qubit is -1, else 

879 (2**len(output_qubit),). 

880 """ 

881 # Call forward method which handles the actual caching etc. 

882 return self._forward( 

883 params=params, 

884 inputs=inputs, 

885 noise_params=noise_params, 

886 cache=cache, 

887 execution_type=execution_type, 

888 force_mean=force_mean, 

889 ) 

890 

891 def _forward( 

892 self, 

893 params: Optional[np.ndarray] = None, 

894 inputs: Optional[np.ndarray] = None, 

895 noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None, 

896 cache: Optional[bool] = False, 

897 execution_type: Optional[str] = None, 

898 force_mean: bool = False, 

899 ) -> np.ndarray: 

900 """ 

901 Perform a forward pass of the quantum circuit. 

902 

903 Args: 

904 params (Optional[np.ndarray]): Weight vector of shape 

905 [n_layers, n_qubits*n_params_per_layer]. 

906 If None, model internal parameters are used. 

907 inputs (Optional[np.ndarray]): Input vector of shape [1]. 

908 If None, zeros are used. 

909 noise_params (Optional[Dict[str, float]], optional): The noise parameters. 

910 Defaults to None which results in the last 

911 set noise parameters being used. 

912 cache (Optional[bool], optional): Whether to cache the results. 

913 Defaults to False. 

914 execution_type (str, optional): The type of execution. 

915 Must be one of 'expval', 'density', or 'probs'. 

916 Defaults to None which results in the last set execution type 

917 being used. 

918 force_mean (bool, optional): Whether to average 

919 when performing n-local measurements. 

920 Defaults to False. 

921 

922 Returns: 

923 np.ndarray: The output of the quantum circuit. 

924 The shape depends on the execution_type. 

925 - If execution_type is 'expval', returns an ndarray of shape 

926 (1,) if output_qubit is -1, else (len(output_qubit),). 

927 - If execution_type is 'density', returns an ndarray 

928 of shape (2**n_qubits, 2**n_qubits). 

929 - If execution_type is 'probs', returns an ndarray 

930 of shape (2**n_qubits,) if output_qubit is -1, else 

931 (2**len(output_qubit),). 

932 

933 Raises: 

934 NotImplementedError: If the number of shots is not None or if the 

935 expectation value is True. 

936 """ 

937 # set the parameters as object attributes 

938 if noise_params is not None: 

939 self.noise_params = noise_params 

940 if execution_type is not None: 

941 self.execution_type = execution_type 

942 

943 params = self._params_validation(params) 

944 inputs = self._inputs_validation(inputs) 

945 inputs, params, self.batch_shape = self._assimilate_batch(inputs, params) 

946 # the qasm representation contains the bound parameters, 

947 # thus it is ok to hash that 

948 hs = hashlib.md5( 

949 repr( 

950 { 

951 "n_qubits": self.n_qubits, 

952 "n_layers": self.n_layers, 

953 "pqc": self.pqc.__class__.__name__, 

954 "dru": self.data_reupload, 

955 "params": self.params, # use safe-params 

956 "noise_params": self.noise_params, 

957 "execution_type": self.execution_type, 

958 "inputs": inputs, 

959 "output_qubit": self.output_qubit, 

960 } 

961 ).encode("utf-8") 

962 ).hexdigest() 

963 

964 result: Optional[np.ndarray] = None 

965 if cache: 

966 name: str = f"pqc_{hs}.npy" 

967 

968 cache_folder: str = ".cache" 

969 if not os.path.exists(cache_folder): 

970 os.mkdir(cache_folder) 

971 

972 file_path: str = os.path.join(cache_folder, name) 

973 

974 if os.path.isfile(file_path): 

975 result = np.load(file_path) 

976 

977 if result is None: 

978 # if density matrix requested or noise params used 

979 if self.execution_type == "density" or self.noise_params is not None: 

980 result = self._mp_executor( 

981 f=self.circuit_mixed, 

982 params=params, # use arraybox params 

983 inputs=inputs, 

984 ) 

985 else: 

986 if not isinstance(self.circuit, qml.QNode): 

987 result = self.circuit( 

988 inputs=inputs, 

989 ) 

990 else: 

991 result = self._mp_executor( 

992 f=self.circuit, 

993 params=params, # use arraybox params 

994 inputs=inputs, 

995 ) 

996 

997 if isinstance(result, list): 

998 result = np.stack(result) 

999 

1000 if self.execution_type == "expval" and force_mean and self.output_qubit == -1: 

1001 # exception for torch layer because it swaps batch and output dimension 

1002 if not isinstance(self.circuit, qml.QNode): 

1003 result = result.mean(axis=-1) 

1004 else: 

1005 result = result.mean(axis=0) 

1006 elif self.execution_type == "probs" and force_mean and self.output_qubit == -1: 

1007 # exception for torch layer because it swaps batch and output dimension 

1008 if not isinstance(self.circuit, qml.QNode): 

1009 result = result[..., -1].sum(axis=-1) 

1010 else: 

1011 result = result[1:, ...].sum(axis=0) 

1012 

1013 if self.batch_shape[0] > 1 and self.batch_shape[1] > 1: 

1014 result = result.reshape(-1, *self.batch_shape) 

1015 

1016 result = result.squeeze() 

1017 

1018 if cache: 

1019 np.save(file_path, result) 

1020 

1021 return result 

1022 

1023 def get_specs(self, inputs: Optional[np.ndarray] = None) -> dict: 

1024 """ 

1025 Get pennylane specs for the model. 

1026 

1027 Args: 

1028 inputs (Optional[np.ndarray]): The inputs, with which to call the 

1029 circuit. Defaults to None. 

1030 

1031 Returns: 

1032 dict: Dictionary of specs. The key "resources" contains information 

1033 about the circuit size and gate statistics. 

1034 """ 

1035 inputs = self._inputs_validation(inputs) 

1036 spec_model = deepcopy(self) 

1037 spec_model.noise_params = None # remove noise 

1038 return qml.specs(spec_model.circuit)(self.params, inputs) 

1039 

1040 def get_circuit_depth(self, inputs: Optional[np.ndarray] = None) -> int: 

1041 """ 

1042 Obtain circuit depth for the model 

1043 

1044 Args: 

1045 inputs (Optional[np.ndarray]): The inputs, with which to call the 

1046 circuit. Defaults to None. 

1047 

1048 Returns: 

1049 int: Circuit depth (longest path of gates in circuit.) 

1050 """ 

1051 return self.get_specs(inputs)["resources"].depth