Coverage for qml_essentials/model.py: 93%
404 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-09-08 14:29 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-09-08 14:29 +0000
1from typing import Dict, Optional, Tuple, Callable, Union, List
2import pennylane as qml
3import pennylane.numpy as np
4import hashlib
5import os
6import warnings
7from autograd.numpy import numpy_boxes
8from copy import deepcopy
9import math
11from qml_essentials.ansaetze import Gates, Ansaetze, Circuit
12from qml_essentials.utils import PauliCircuit, QuanTikz, MultiprocessingPool
15import logging
17log = logging.getLogger(__name__)
20class Model:
21 """
22 A quantum circuit model.
23 """
25 lightning_threshold = 12
26 cpu_scaler = 0.9 # default cpu scaler, =1 means full CPU for MP
28 def __init__(
29 self,
30 n_qubits: int,
31 n_layers: int,
32 circuit_type: Union[str, Circuit] = "No_Ansatz",
33 data_reupload: Union[bool, List[bool], List[List[bool]]] = True,
34 state_preparation: Union[str, Callable, List[str], List[Callable]] = None,
35 encoding: Union[str, Callable, List[str], List[Callable]] = Gates.RX,
36 trainable_frequencies: bool = False,
37 initialization: str = "random",
38 initialization_domain: List[float] = [0, 2 * np.pi],
39 output_qubit: Union[List[int], int] = -1,
40 shots: Optional[int] = None,
41 random_seed: int = 1000,
42 as_pauli_circuit: bool = False,
43 remove_zero_encoding: bool = True,
44 mp_threshold: int = -1,
45 ) -> None:
46 """
47 Initialize the quantum circuit model.
48 Parameters will have the shape [impl_n_layers, parameters_per_layer]
49 where impl_n_layers is the number of layers provided and added by one
50 depending if data_reupload is True and parameters_per_layer is given by
51 the chosen ansatz.
53 The model is initialized with the following parameters as defaults:
54 - noise_params: None
55 - execution_type: "expval"
56 - shots: None
58 Args:
59 n_qubits (int): The number of qubits in the circuit.
60 n_layers (int): The number of layers in the circuit.
61 circuit_type (str, Circuit): The type of quantum circuit to use.
62 If None, defaults to "no_ansatz".
63 data_reupload (Union[bool, List[bool], List[List[bool]]], optional):
64 Whether to reupload data to the quantum device on each
65 layer and qubit. Detailed re-uploading instructions can be given
66 as a list/array of 0/False and 1/True with shape (n_qubits,
67 n_layers) to specify where to upload the data. Defaults to True
68 for applying data re-uploading to the full circuit.
69 encoding (Union[str, Callable, List[str], List[Callable]], optional):
70 The unitary to use for encoding the input data. Can be a string
71 (e.g. "RX") or a callable (e.g. qml.RX). Defaults to qml.RX.
72 If input is multidimensional it is assumed to be a list of
73 unitaries or a list of strings.
74 trainable_frequencies (bool, optional):
75 Sets trainable encoding parameters for trainable frequencies.
76 Defaults to False.
77 initialization (str, optional): The strategy to initialize the parameters.
78 Can be "random", "zeros", "zero-controlled", "pi", or "pi-controlled".
79 Defaults to "random".
80 output_qubit (List[int], int, optional): The index of the output
81 qubit (or qubits). When set to -1 all qubits are measured, or a
82 global measurement is conducted, depending on the execution
83 type.
84 shots (Optional[int], optional): The number of shots to use for
85 the quantum device. Defaults to None.
86 random_seed (int, optional): seed for the random number generator
87 in initialization is "random" and for random noise parameters.
88 Defaults to 1000.
89 as_pauli_circuit (bool, optional): whether the circuit is
90 transformed to a Pauli-Clifford circuit as described by Nemkov
91 et al. (https://doi.org/10.1103/PhysRevA.108.032406), which is
92 required for analytical Fourier coefficient computation.
93 Defaults to False.
94 remove_zero_encoding (bool, optional): whether to
95 remove the zero encoding from the circuit. Defaults to True.
96 mp_threshold (int, optional): threshold above which the parameter
97 batch dimension is split across multiple processes.
98 Defaults to -1.
100 Returns:
101 None
102 """
103 # Initialize default parameters needed for circuit evaluation
104 self.noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None
105 self.execution_type: Optional[str] = "expval"
106 self.shots = shots
107 self.remove_zero_encoding = remove_zero_encoding
108 self.mp_threshold = mp_threshold
109 self.n_qubits: int = n_qubits
110 self.n_layers: int = n_layers
111 self.trainable_frequencies: bool = trainable_frequencies
113 if isinstance(output_qubit, list):
114 assert (
115 len(output_qubit) <= n_qubits
116 ), f"Size of output_qubit {len(output_qubit)} cannot be\
117 larger than number of qubits {n_qubits}."
118 self.output_qubit: Union[List[int], int] = output_qubit
120 # Initialize rng in Gates
121 Gates.init_rng(random_seed)
123 # --- State Preparation ---
124 # first check if we have a str, list or callable
125 if isinstance(state_preparation, str):
126 # if str, use the pennylane fct
127 self._sp = [getattr(Gates, f"{state_preparation}")]
128 elif isinstance(state_preparation, list):
129 # if list, check if str or callable
130 if isinstance(state_preparation[0], str):
131 self._sp = [getattr(Gates, f"{sp}") for sp in state_preparation]
132 else:
133 self._sp = state_preparation
134 elif state_preparation is None:
135 self._sp = [lambda *args, **kwargs: None]
136 else:
137 # default to callable
138 self._sp = [state_preparation]
140 # --- Encoding ---
141 # first check if we have a str, list or callable
142 if isinstance(encoding, str):
143 # if str, use the pennylane fct
144 self._enc = [getattr(Gates, f"{encoding}")]
145 elif isinstance(encoding, list):
146 # if list, check if str or callable
147 if isinstance(encoding[0], str):
148 self._enc = [getattr(Gates, f"{enc}") for enc in encoding]
149 else:
150 self._enc = encoding
151 else:
152 # default to callable
153 self._enc = [encoding]
155 # Number of possible inputs
156 self.n_input_feat = len(self._enc)
157 log.info(f"Number of input features: {self.n_input_feat}")
159 # Trainable frequencies, default initialization as in arXiv:2309.03279v2
160 self.enc_params = np.ones(
161 (self.n_qubits, self.n_input_feat), requires_grad=trainable_frequencies
162 )
164 # --- Data-Reuploading ---
165 # Process data reuploading strategy and set degree
166 if not isinstance(data_reupload, bool):
167 if not isinstance(data_reupload, np.ndarray):
168 data_reupload = np.array(data_reupload)
169 if data_reupload.shape == (
170 n_layers,
171 n_qubits,
172 ):
173 data_reupload = data_reupload.reshape(*data_reupload.shape, 1)
174 data_reupload = np.repeat(data_reupload, self.n_input_feat, axis=2)
176 assert data_reupload.shape == (
177 n_layers,
178 n_qubits,
179 self.n_input_feat,
180 ), f"Data reuploading array has wrong shape. \
181 Expected {(n_layers, n_qubits)} or\
182 {(n_layers, n_qubits, self.n_input_feat)},\
183 got {data_reupload.shape}."
185 log.debug(f"Data reuploading array:\n{data_reupload}")
186 else:
187 if data_reupload:
188 impl_n_layers: int = (
189 n_layers + 1
190 ) # we need L+1 according to Schuld et al.
191 data_reupload = np.ones((n_layers, n_qubits, self.n_input_feat))
192 log.debug("Full data reuploading.")
193 else:
194 impl_n_layers: int = n_layers
195 data_reupload = np.zeros((n_layers, n_qubits, self.n_input_feat))
196 data_reupload[0][0] = 1
197 log.debug("No data reuploading.")
199 # convert to boolean values
200 self.data_reupload = data_reupload.astype(bool)
201 self.frequencies = [
202 np.count_nonzero(self.data_reupload[..., i])
203 for i in range(self.n_input_feat)
204 ]
206 if self.degree > 1:
207 impl_n_layers: int = n_layers + 1 # we need L+1 according to Schuld et al.
208 else:
209 impl_n_layers = n_layers
210 log.info(f"Number of implicit layers: {impl_n_layers}.")
212 # --- Ansatz ---
213 # only weak check for str. We trust the user to provide sth useful
214 if isinstance(circuit_type, str):
215 self.pqc: Callable[[Optional[np.ndarray], int], int] = getattr(
216 Ansaetze, circuit_type or "No_Ansatz"
217 )()
218 else:
219 self.pqc = circuit_type()
220 log.info(f"Using Ansatz {circuit_type}.")
222 # calculate the shape of the parameter vector here, we will re-use this in init.
223 params_per_layer = self.pqc.n_params_per_layer(self.n_qubits)
224 self._params_shape: Tuple[int, int] = (impl_n_layers, params_per_layer)
225 log.info(f"Parameters per layer: {params_per_layer}")
227 self.batch_shape = (1, 1)
228 # this will also be re-used in the init method,
229 # however, only if nothing is provided
230 self._inialization_strategy = initialization
231 self._initialization_domain = initialization_domain
233 # ..here! where we only require a rng
234 self.initialize_params(np.random.default_rng(random_seed))
236 # Initialize two circuits, one with the default device and
237 # one with the mixed device
238 # which allows us to later route depending on the state_vector flag
239 self.as_pauli_circuit = as_pauli_circuit
241 self.circuit_mixed: qml.QNode = qml.QNode(
242 self._circuit,
243 qml.device("default.mixed", shots=self.shots, wires=self.n_qubits),
244 interface="autograd" if self.shots is not None else "auto",
245 diff_method="parameter-shift" if self.shots is not None else "best",
246 )
248 @property
249 def degree(self):
250 return max(self.frequencies)
252 @property
253 def as_pauli_circuit(self) -> bool:
254 return self._as_pauli_circuit
256 @as_pauli_circuit.setter
257 def as_pauli_circuit(self, value: bool) -> None:
258 self._as_pauli_circuit = value
260 if self.n_qubits < self.lightning_threshold:
261 device = "default.qubit"
262 else:
263 device = "lightning.qubit"
264 self.mp_threshold = -1
266 self.circuit: qml.QNode = qml.QNode(
267 self._circuit,
268 qml.device(
269 device,
270 shots=self.shots,
271 wires=self.n_qubits,
272 ),
273 interface="autograd" if self.shots is not None else "auto",
274 diff_method="parameter-shift" if self.shots is not None else "best",
275 )
277 if value:
278 pauli_circuit_transform = qml.transform(
279 PauliCircuit.from_parameterised_circuit
280 )
281 self.circuit = pauli_circuit_transform(self.circuit)
283 @property
284 def noise_params(self) -> Optional[Dict[str, Union[float, Dict[str, float]]]]:
285 """
286 Gets the noise parameters of the model.
288 Returns:
289 Optional[Dict[str, float]]: A dictionary of
290 noise parameters or None if not set.
291 """
292 return self._noise_params
294 @noise_params.setter
295 def noise_params(
296 self, kvs: Optional[Dict[str, Union[float, Dict[str, float]]]]
297 ) -> None:
298 """
299 Sets the noise parameters of the model.
301 Typically a "noise parameter" refers to the error probability.
302 ThermalRelaxation is a special case, and supports a dict as value with
303 structure:
304 "ThermalRelaxation":
305 {
306 "t1": 2000, # relative t1 time.
307 "t2": 1000, # relative t2 time
308 "t_factor" 1: # relative gate time factor
309 },
311 Args:
312 kvs (Optional[Dict[str, Union[float, Dict[str, float]]]]): A
313 dictionary of noise parameters. If all values are 0.0, the noise
314 parameters are set to None.
316 Returns:
317 None
318 """
319 # set to None if only zero values provided
320 if kvs is not None and all(np == 0.0 for np in kvs.values()):
321 kvs = None
323 # set default values
324 if kvs is not None:
325 kvs.setdefault("BitFlip", 0.0)
326 kvs.setdefault("PhaseFlip", 0.0)
327 kvs.setdefault("Depolarizing", 0.0)
328 kvs.setdefault("MultiQubitDepolarizing", 0.0)
329 kvs.setdefault("AmplitudeDamping", 0.0)
330 kvs.setdefault("PhaseDamping", 0.0)
331 kvs.setdefault("GateError", 0.0)
332 kvs.setdefault("ThermalRelaxation", None)
333 kvs.setdefault("StatePreparation", 0.0)
334 kvs.setdefault("Measurement", 0.0)
336 # check if there are any keys not supported
337 for key in kvs.keys():
338 if key not in [
339 "BitFlip",
340 "PhaseFlip",
341 "Depolarizing",
342 "MultiQubitDepolarizing",
343 "AmplitudeDamping",
344 "PhaseDamping",
345 "GateError",
346 "ThermalRelaxation",
347 "StatePreparation",
348 "Measurement",
349 ]:
350 warnings.warn(
351 f"Noise type {key} is not supported by this package",
352 UserWarning,
353 )
355 # check valid params for thermal relaxation noise channel
356 tr_params = kvs["ThermalRelaxation"]
357 if isinstance(tr_params, dict):
358 tr_params.setdefault("t1", 0.0)
359 tr_params.setdefault("t2", 0.0)
360 tr_params.setdefault("t_factor", 0.0)
361 for k in tr_params.keys():
362 if k not in [
363 "t1",
364 "t2",
365 "t_factor",
366 ]:
367 warnings.warn(
368 f"Thermal Relaxation parameter {k} is not supported "
369 f"by this package",
370 UserWarning,
371 )
372 if not all(tr_params.values()) or tr_params["t2"] > 2 * tr_params["t1"]:
373 warnings.warn(
374 "Received invalid values for Thermal Relaxation noise "
375 "parameter. Thermal relaxation is not applied!",
376 UserWarning,
377 )
378 kvs["ThermalRelaxation"] = 0.0
380 self._noise_params = kvs
382 @property
383 def execution_type(self) -> str:
384 """
385 Gets the execution type of the model.
387 Returns:
388 str: The execution type, one of 'density', 'expval', or 'probs'.
389 """
390 return self._execution_type
392 @execution_type.setter
393 def execution_type(self, value: str) -> None:
394 if value not in ["density", "state", "expval", "probs"]:
395 raise ValueError(f"Invalid execution type: {value}.")
397 if (value == "density" or value == "state") and self.output_qubit != -1:
398 warnings.warn(
399 f"{value} measurement does ignore output_qubit, which is "
400 f"{self.output_qubit}.",
401 UserWarning,
402 )
404 if value == "probs" and self.shots is None:
405 warnings.warn(
406 "Setting execution_type to probs without specifying shots.",
407 UserWarning,
408 )
410 if value == "density" and self.shots is not None:
411 warnings.warn(
412 "Setting execution_type to density with specified shots.",
413 UserWarning,
414 )
416 self._execution_type = value
418 @property
419 def shots(self) -> Optional[int]:
420 """
421 Gets the number of shots to use for the quantum device.
423 Returns:
424 Optional[int]: The number of shots.
425 """
426 return self._shots
428 @shots.setter
429 def shots(self, value: Optional[int]) -> None:
430 """
431 Sets the number of shots to use for the quantum device.
433 Args:
434 value (Optional[int]): The number of shots.
435 If an integer less than or equal to 0 is provided, it is set to None.
437 Returns:
438 None
439 """
440 if type(value) is int and value <= 0:
441 value = None
442 self._shots = value
444 def initialize_params(
445 self,
446 rng: np.random.Generator,
447 repeat: int = None,
448 initialization: str = None,
449 initialization_domain: List[float] = None,
450 ) -> None:
451 """
452 Initializes the parameters of the model.
454 Args:
455 rng: A random number generator to use for initialization.
456 repeat: The number of times to repeat the parameters.
457 If None, the number of layers is used.
458 initialization: The strategy to use for parameter initialization.
459 If None, the strategy specified in the constructor is used.
460 initialization_domain: The domain to use for parameter initialization.
461 If None, the domain specified in the constructor is used.
463 Returns:
464 None
465 """
466 params_shape = (
467 self._params_shape if repeat is None else [*self._params_shape, repeat]
468 )
469 # use existing strategy if not specified
470 initialization = initialization or self._inialization_strategy
471 initialization_domain = initialization_domain or self._initialization_domain
473 def set_control_params(params: np.ndarray, value: float) -> np.ndarray:
474 indices = self.pqc.get_control_indices(self.n_qubits)
475 if indices is None:
476 warnings.warn(
477 f"Specified {initialization} but circuit\
478 does not contain controlled rotation gates.\
479 Parameters are intialized randomly.",
480 UserWarning,
481 )
482 else:
483 params[:, indices[0] : indices[1] : indices[2]] = (
484 np.ones_like(params[:, indices[0] : indices[1] : indices[2]])
485 * value
486 )
487 return params
489 if initialization == "random":
490 self.params: np.ndarray = rng.uniform(
491 *initialization_domain, params_shape, requires_grad=True
492 )
493 elif initialization == "zeros":
494 self.params: np.ndarray = np.zeros(params_shape, requires_grad=True)
495 elif initialization == "pi":
496 self.params: np.ndarray = np.ones(params_shape, requires_grad=True) * np.pi
497 elif initialization == "zero-controlled":
498 self.params: np.ndarray = rng.uniform(
499 *initialization_domain, params_shape, requires_grad=True
500 )
501 self.params = set_control_params(self.params, 0)
502 elif initialization == "pi-controlled":
503 self.params: np.ndarray = rng.uniform(
504 *initialization_domain, params_shape, requires_grad=True
505 )
506 self.params = set_control_params(self.params, np.pi)
507 else:
508 raise Exception("Invalid initialization method")
510 log.info(
511 f"Initialized parameters with shape {self.params.shape}\
512 using strategy {initialization}."
513 )
515 def transform_input(self, inputs: np.ndarray, enc_params: Optional[np.ndarray]):
516 """
517 Transforms the input as in arXiv:2309.03279v2
519 Args:
520 inputs (np.ndarray): single input point of shape (1, n_input_feat)
521 idx (int): feature index
522 qubit (int): qubit on which to the encoding is being performed
523 enc_params (np.ndarray): encoding weight vector of
524 shape (n_qubits)
526 Returns:
527 np.ndarray: transformed input of shape (1,), linearly scaled by
528 enc_params, ready for encoding
529 """
530 return inputs * enc_params
532 def _iec(
533 self,
534 inputs: np.ndarray,
535 data_reupload: np.ndarray,
536 enc: Union[Callable, List[Callable]],
537 enc_params: np.ndarray,
538 noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None,
539 ) -> None:
540 """
541 Creates an AngleEncoding using RX gates
543 Args:
544 inputs (np.ndarray): single input point of shape (1, n_input_feat)
545 data_reupload (np.ndarray): Boolean array to indicate positions in
546 the circuit for data re-uploading for the IEC, shape is
547 (n_qubits, n_layers).
548 enc: Callable or List[Callable]: encoding function or list of encoding
549 functions
550 enc_params (np.ndarray): encoding weight vector
551 of shape [n_qubits, n_inputs]
552 noise_params (Optional[Dict[str, Union[float, Dict[str, float]]]]):
553 The noise parameters.
554 Returns:
555 None
556 """
557 # check for zero, because due to input validation, input cannot be none
558 if self.remove_zero_encoding and not inputs.any():
559 return
561 for q in range(self.n_qubits):
562 for idx in range(inputs.shape[1]):
563 if data_reupload[q, idx]:
564 enc[idx](
565 self.transform_input(inputs[:, idx], enc_params[q, idx]),
566 wires=q,
567 noise_params=noise_params,
568 )
570 def _circuit(
571 self,
572 params: np.ndarray,
573 inputs: np.ndarray,
574 enc_params: Optional[np.ndarray] = None,
575 ) -> Union[float, np.ndarray]:
576 """
577 Creates a circuit with noise.
579 Args:
580 params (np.ndarray): weight vector of shape
581 [n_layers, n_qubits*(n_params_per_layer+trainable_frequencies)]
582 inputs (np.ndarray): input vector of size 1
583 enc_params Optional[np.ndarray]: encoding weight vector
584 of shape [n_qubits, n_inputs]
585 Returns:
586 Union[float, np.ndarray]: Expectation value of PauliZ(0)
587 of the circuit if state_vector is False and expval is True,
588 otherwise the density matrix of all qubits.
589 """
591 self._variational(params=params, inputs=inputs, enc_params=enc_params)
592 return self._observable()
594 def _variational(self, params, inputs, enc_params=None):
595 if enc_params is None:
596 warnings.warn(
597 "Explicit call to `_circuit` or `_variational` detected: "
598 "`enc_params` is None, using `self.enc_params` instead.",
599 RuntimeWarning,
600 )
601 enc_params = self.enc_params
603 if self.noise_params is not None:
604 self._apply_state_prep_noise()
606 # state preparation
607 for q in range(self.n_qubits):
608 for _sp in self._sp:
609 _sp(wires=q, noise_params=self.noise_params)
611 # circuit building
612 for layer in range(0, self.n_layers):
613 # ansatz layers
614 self.pqc(params[layer], self.n_qubits, noise_params=self.noise_params)
616 # encoding layers
617 self._iec(
618 inputs,
619 data_reupload=self.data_reupload[layer],
620 enc=self._enc,
621 enc_params=enc_params,
622 noise_params=self.noise_params,
623 )
625 # visual barrier
626 if self.degree > 1:
627 qml.Barrier(wires=list(range(self.n_qubits)), only_visual=True)
629 # final ansatz layer
630 if self.degree > 1: # same check as in init
631 self.pqc(params[-1], self.n_qubits, noise_params=self.noise_params)
633 # channel noise
634 if self.noise_params is not None:
635 self._apply_general_noise()
637 def _observable(self):
638 # run mixed simualtion and get density matrix
639 if self.execution_type == "density":
640 return qml.density_matrix(wires=list(range(self.n_qubits)))
641 elif self.execution_type == "state":
642 return qml.state()
643 # run default simulation and get expectation value
644 elif self.execution_type == "expval":
645 # n-local measurement
646 if self.output_qubit == -1:
647 return [qml.expval(qml.PauliZ(q)) for q in range(self.n_qubits)]
648 # local measurement(s)
649 elif isinstance(self.output_qubit, int):
650 return qml.expval(qml.PauliZ(self.output_qubit))
651 # parity measurenment
652 elif isinstance(self.output_qubit, list):
653 obs = qml.PauliZ(self.output_qubit[0])
654 for out_qubit in self.output_qubit[1:]:
655 obs = obs @ qml.PauliZ(out_qubit)
656 return qml.expval(obs)
657 else:
658 raise ValueError(
659 f"Invalid parameter `output_qubit`: {self.output_qubit}.\
660 Must be int, list or -1."
661 )
662 # run default simulation and get probs
663 elif self.execution_type == "probs":
664 if self.output_qubit == -1:
665 return qml.probs(wires=list(range(self.n_qubits)))
666 else:
667 return qml.probs(wires=self.output_qubit)
668 else:
669 raise ValueError(f"Invalid execution_type: {self.execution_type}.")
671 def _apply_state_prep_noise(self) -> None:
672 """
673 Applies a state preparation error on each qubit according to the
674 probability for StatePreparation provided in the noise_params.
675 """
676 p = self.noise_params.get("StatePreparation", 0.0)
677 for q in range(self.n_qubits):
678 if p > 0:
679 qml.BitFlip(p, wires=q)
681 def _apply_general_noise(self) -> None:
682 """
683 Applies general types of noise the full circuit (in contrast to gate
684 errors, applied directly at gate level, see Gates.Noise).
686 Possible types of noise are:
687 - AmplitudeDamping (specified through probability)
688 - PhaseDamping (specified through probability)
689 - ThermalRelaxation (specified through a dict, containing keys
690 "t1", "t2", "t_factor")
691 - Measurement (specified through probability)
692 """
693 amp_damp = self.noise_params.get("AmplitudeDamping", 0.0)
694 phase_damp = self.noise_params.get("PhaseDamping", 0.0)
695 thermal_relax = self.noise_params.get("ThermalRelaxation", 0.0)
696 meas = self.noise_params.get("Measurement", 0.0)
697 for q in range(self.n_qubits):
698 if amp_damp > 0:
699 qml.AmplitudeDamping(amp_damp, wires=q)
700 if phase_damp > 0:
701 qml.PhaseDamping(phase_damp, wires=q)
702 if meas > 0:
703 qml.BitFlip(meas, wires=q)
704 if isinstance(thermal_relax, dict):
705 t1 = thermal_relax["t1"]
706 t2 = thermal_relax["t2"]
707 t_factor = thermal_relax["t_factor"]
708 circuit_depth = self._get_circuit_depth()
709 tg = circuit_depth * t_factor
710 qml.ThermalRelaxationError(1.0, t1, t2, tg, q)
712 def _get_circuit_depth(self, inputs: Optional[np.ndarray] = None) -> int:
713 """
714 Obtain circuit depth for the model
716 Args:
717 inputs (Optional[np.ndarray]): The inputs, with which to call the
718 circuit. Defaults to None.
720 Returns:
721 int: Circuit depth (longest path of gates in circuit.)
722 """
723 inputs = self._inputs_validation(inputs)
724 spec_model = deepcopy(self)
725 spec_model.noise_params = None # remove noise
726 specs = qml.specs(spec_model.circuit)(self.params, inputs)
728 return specs["resources"].depth
730 def draw(self, inputs=None, figure="text", *args, **kwargs):
731 """
732 Draws the quantum circuit using the specified visualization method.
734 Args:
735 inputs (Optional[np.ndarray]): Input vector for the circuit. If None,
736 the default inputs are used.
737 figure (str, optional): The type of figure to generate. Must be one of
738 'text', 'mpl', or 'tikz'. Defaults to 'text'.
739 Returns:
740 Either a string, matplotlib figure or TikzFigure object (similar to string)
741 depending on the chosen visualization.
742 *args:
743 Additional arguments to be passed to the visualization method.
744 **kwargs:
745 Additional keyword arguments to be passed to the visualization method.
747 Raises:
748 AssertionError: If the 'figure' argument is not one of the accepted values.
749 """
751 if not isinstance(self.circuit, qml.QNode):
752 # TODO: throws strange argument error if not catched
753 return ""
755 assert figure in [
756 "text",
757 "mpl",
758 "tikz",
759 ], f"Invalid figure: {figure}. Must be 'text', 'mpl' or 'tikz'."
761 inputs = self._inputs_validation(inputs)
763 if figure == "mpl":
764 result = qml.draw_mpl(self.circuit)(
765 params=self.params,
766 inputs=inputs,
767 enc_params=self.enc_params,
768 *args,
769 **kwargs,
770 )
771 elif figure == "tikz":
772 result = QuanTikz.build(
773 self.circuit,
774 params=self.params,
775 inputs=inputs,
776 enc_params=self.enc_params,
777 *args,
778 **kwargs,
779 )
780 else:
781 result = qml.draw(self.circuit)(
782 params=self.params, inputs=inputs, enc_params=self.enc_params
783 )
784 return result
786 def __repr__(self) -> str:
787 return self.draw(figure="text")
789 def __str__(self) -> str:
790 return self.draw(figure="text")
792 def _params_validation(self, params) -> np.ndarray:
793 """
794 Sets the parameters when calling the quantum circuit
796 Args:
797 params (np.ndarray): The parameters used for the call
798 """
799 if params is None:
800 params = self.params
801 else:
802 if numpy_boxes.ArrayBox == type(params):
803 self.params = params._value
804 else:
805 self.params = params
807 # Get rid of extra dimension
808 if len(params.shape) == 3 and params.shape[2] == 1:
809 params = params[:, :, 0]
811 return params
813 def _enc_params_validation(self, enc_params) -> np.ndarray:
814 """
815 Sets the encoding parameters when calling the quantum circuit
817 Args:
818 enc_params (np.ndarray): The encoding parameters used for the call
819 """
820 if enc_params is None:
821 enc_params = self.enc_params
822 else:
823 if isinstance(enc_params, numpy_boxes.ArrayBox):
824 if self.trainable_frequencies:
825 self.enc_params = enc_params._value
826 else:
827 self.enc_params = np.array(
828 enc_params._value, requires_grad=self.trainable_frequencies
829 )
830 else:
831 if self.trainable_frequencies:
832 self.enc_params = enc_params
833 else:
834 self.enc_params = np.array(
835 enc_params, requires_grad=self.trainable_frequencies
836 )
838 if len(enc_params.shape) == 1 and self.n_input_feat == 1:
839 enc_params = enc_params.reshape(-1, 1)
840 elif len(enc_params.shape) == 1 and self.n_input_feat > 1:
841 raise ValueError(
842 f"Input dimension {self.n_input_feat} >1 but \
843 `enc_params` has shape {enc_params.shape}"
844 )
846 return enc_params
848 def _inputs_validation(
849 self, inputs: Union[None, List, float, int, np.ndarray]
850 ) -> np.ndarray:
851 """
852 Validate the inputs to be a 2D numpy array of shape (batch_size, n_inputs).
854 Args:
855 inputs (Union[None, List, float, int, np.ndarray]): The input to validate.
857 Returns:
858 np.ndarray: The validated input.
859 """
860 if inputs is None:
861 # initialize to zero
862 inputs = np.array([[0] * self.n_input_feat])
863 elif isinstance(inputs, List):
864 inputs = np.stack(inputs)
865 elif isinstance(inputs, float) or isinstance(inputs, int):
866 inputs = np.array([inputs])
868 if len(inputs.shape) <= 1:
869 if self.n_input_feat == 1:
870 # add a batch dimension
871 inputs = inputs.reshape(-1, 1)
872 else:
873 if inputs.shape[0] == self.n_input_feat:
874 inputs = inputs.reshape(1, -1)
875 else:
876 inputs = inputs.reshape(-1, 1)
877 inputs = inputs.repeat(self.n_input_feat, axis=1)
878 warnings.warn(
879 f"Expected {self.n_input_feat} inputs, but {inputs.shape[0]} "
880 "was provided, replicating input for all input features.",
881 UserWarning,
882 )
883 else:
884 if inputs.shape[1] != self.n_input_feat:
885 raise ValueError(
886 f"Wrong number of inputs provided. Expected {self.n_input_feat} "
887 f"inputs, but input has shape {inputs.shape}."
888 )
890 return inputs
892 @staticmethod
893 def _parallel_f(
894 procnum,
895 result,
896 f,
897 batch_size,
898 params,
899 inputs,
900 batch_shape,
901 enc_params,
902 ):
903 """
904 Helper function for parallelizing a function f over parameters.
905 Sices the batch dimension based on the procnum and batch size.
907 Args:
908 procnum: The process number.
909 result: The result array.
910 f: The function to be parallelized.
911 batch_size: The batch size.
912 params: The parameters array.
913 inputs: The inputs array.
914 enc_params: The encoding parameters array.
915 """
916 min_idx = max(procnum * batch_size, 0)
918 if batch_shape[0] > 1:
919 max_idx = min((procnum + 1) * batch_size, inputs.shape[0])
920 inputs = inputs[min_idx:max_idx]
921 if batch_shape[1] > 1:
922 max_idx = min((procnum + 1) * batch_size, params.shape[2])
923 params = params[:, :, min_idx:max_idx]
925 result[procnum] = f(params=params, inputs=inputs, enc_params=enc_params)
927 def _mp_executor(self, f, params, inputs, enc_params):
928 """
929 Execute a function f in parallel over parameters.
931 Args:
932 f: A function that takes two arguments, params and inputs,
933 and returns a numpy array.
934 params: A 3D numpy array of parameters where the first dimension is
935 the layer index, the second dimension is the parameter index in
936 the layer, and the third dimension is the sample index.
937 inputs: A 2D numpy array of inputs where the first dimension is
938 the sample index and the second dimension is the input feature index.
939 enc_params: A 1D numpy array of encoding parameters where the dimension is
940 the qubit index.
942 Returns:
943 A numpy array of the output of f applied to each batch of
944 samples in params, enc_params, and inputs.
945 """
946 n_processes = 1
947 # batches available?
948 combined_batch_size = math.prod(self.batch_shape)
949 if (
950 combined_batch_size > 1
951 and self.mp_threshold > 0
952 and combined_batch_size > self.mp_threshold
953 ):
954 n_processes = math.ceil(combined_batch_size / self.mp_threshold)
955 # check if single process
956 if n_processes == 1:
957 if self.mp_threshold > 0:
958 warnings.warn(
959 f"Multiprocessing threshold {self.mp_threshold}>0, but using \
960 single process, because {combined_batch_size} samples per batch.",
961 )
962 result = f(params=params, inputs=inputs, enc_params=enc_params)
963 else:
964 log.info(f"Using {n_processes} processes")
965 mpp = MultiprocessingPool(
966 target=Model._parallel_f,
967 n_processes=n_processes,
968 cpu_scaler=self.cpu_scaler,
969 batch_size=self.mp_threshold,
970 f=f,
971 params=params,
972 enc_params=enc_params,
973 inputs=inputs,
974 batch_shape=self.batch_shape,
975 )
976 return_dict = mpp.spawn()
978 # TODO: the following code could use some optimization
979 result = [None] * len(return_dict)
980 for k, v in return_dict.items():
981 result[k] = v
983 result = np.concat(result, axis=1 if self.execution_type == "expval" else 0)
984 return result
986 def _assimilate_batch(self, inputs, params):
987 batch_shape = (
988 inputs.shape[0],
989 params.shape[2] if len(params.shape) == 3 else 1,
990 )
992 if (
993 batch_shape[1] != 1
994 and batch_shape[0] != batch_shape[1]
995 and batch_shape[0] > 1
996 ):
997 # the following code does some dirty reshaping
998 # TODO: optimize but be aware of the rabbit hole
999 # key is to get the right "order" in which we repeat
1001 # [BI,D] -> [BPxBI,D]
1002 inputs = np.repeat(inputs, batch_shape[1], axis=0)
1004 # this is a tricky one, essentially we want to get
1005 # [L,Q,BP] -> [L,Q,BI,BP] -> [L,Q,BPxBI]
1006 params = np.repeat(
1007 params[:, :, np.newaxis, :], batch_shape[0], axis=2
1008 ).reshape([*params.shape[:-1], np.prod(batch_shape)])
1010 return inputs, params, batch_shape
1012 def _requires_density(self):
1013 """
1014 Checks if the current model requires density matrix simulation or not
1015 based on the noise_params variable and the execution type
1017 Returns:
1018 bool: True if model requires density simulation
1019 """
1020 if self.execution_type == "density":
1021 return True
1023 if self.noise_params is not None:
1024 coherent_noise = ["GateError"]
1025 for k, v in self.noise_params.items():
1026 if k in coherent_noise:
1027 continue
1028 if v is not None and v > 0:
1029 return True
1030 return False
1032 def __call__(
1033 self,
1034 params: Optional[np.ndarray] = None,
1035 inputs: Optional[np.ndarray] = None,
1036 enc_params: Optional[np.ndarray] = None,
1037 noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None,
1038 cache: Optional[bool] = False,
1039 execution_type: Optional[str] = None,
1040 force_mean: bool = False,
1041 ) -> np.ndarray:
1042 """
1043 Perform a forward pass of the quantum circuit.
1045 Args:
1046 params (Optional[np.ndarray]): Weight vector of shape
1047 [n_layers, n_qubits*n_params_per_layer].
1048 If None, model internal parameters are used.
1049 inputs (Optional[np.ndarray]): Input vector of shape [1].
1050 If None, zeros are used.
1051 enc_params (Optional[np.ndarray]): Weight vector of shape
1052 [n_qubits, n_input_features]. If None, model internal encoding
1053 parameters are used.
1054 noise_params (Optional[Dict[str, float]], optional): The noise parameters.
1055 Defaults to None which results in the last
1056 set noise parameters being used.
1057 cache (Optional[bool], optional): Whether to cache the results.
1058 Defaults to False.
1059 execution_type (str, optional): The type of execution.
1060 Must be one of 'expval', 'density', or 'probs'.
1061 Defaults to None which results in the last set execution type
1062 being used.
1063 force_mean (bool, optional): Whether to average
1064 when performing n-local measurements.
1065 Defaults to False.
1067 Returns:
1068 np.ndarray: The output of the quantum circuit.
1069 The shape depends on the execution_type.
1070 - If execution_type is 'expval', returns an ndarray of shape
1071 (1,) if output_qubit is -1, else (len(output_qubit),).
1072 - If execution_type is 'density', returns an ndarray
1073 of shape (2**n_qubits, 2**n_qubits).
1074 - If execution_type is 'probs', returns an ndarray
1075 of shape (2**n_qubits,) if output_qubit is -1, else
1076 (2**len(output_qubit),).
1077 """
1078 # Call forward method which handles the actual caching etc.
1079 return self._forward(
1080 params=params,
1081 inputs=inputs,
1082 enc_params=enc_params,
1083 noise_params=noise_params,
1084 cache=cache,
1085 execution_type=execution_type,
1086 force_mean=force_mean,
1087 )
1089 def _forward(
1090 self,
1091 params: Optional[np.ndarray] = None,
1092 inputs: Optional[np.ndarray] = None,
1093 enc_params: Optional[np.ndarray] = None,
1094 noise_params: Optional[Dict[str, Union[float, Dict[str, float]]]] = None,
1095 cache: Optional[bool] = False,
1096 execution_type: Optional[str] = None,
1097 force_mean: bool = False,
1098 ) -> np.ndarray:
1099 """
1100 Perform a forward pass of the quantum circuit.
1102 Args:
1103 params (Optional[np.ndarray]): Weight vector of shape
1104 [n_layers, n_qubits*n_params_per_layer].
1105 If None, model internal parameters are used.
1106 inputs (Optional[np.ndarray]): Input vector of shape [1].
1107 If None, zeros are used.
1108 enc_params (Optional[np.ndarray]): Weight vector of shape
1109 [n_qubits, n_input_features]. If None, model internal encoding
1110 parameters are used.
1111 noise_params (Optional[Dict[str, float]], optional): The noise parameters.
1112 Defaults to None which results in the last
1113 set noise parameters being used.
1114 cache (Optional[bool], optional): Whether to cache the results.
1115 Defaults to False.
1116 execution_type (str, optional): The type of execution.
1117 Must be one of 'expval', 'density', or 'probs'.
1118 Defaults to None which results in the last set execution type
1119 being used.
1120 force_mean (bool, optional): Whether to average
1121 when performing n-local measurements.
1122 Defaults to False.
1124 Returns:
1125 np.ndarray: The output of the quantum circuit.
1126 The shape depends on the execution_type.
1127 - If execution_type is 'expval', returns an ndarray of shape
1128 (1,) if output_qubit is -1, else (len(output_qubit),).
1129 - If execution_type is 'density', returns an ndarray
1130 of shape (2**n_qubits, 2**n_qubits).
1131 - If execution_type is 'probs', returns an ndarray
1132 of shape (2**n_qubits,) if output_qubit is -1, else
1133 (2**len(output_qubit),).
1135 Raises:
1136 NotImplementedError: If the number of shots is not None or if the
1137 expectation value is True.
1138 """
1139 # set the parameters as object attributes
1140 if noise_params is not None:
1141 self.noise_params = noise_params
1142 if execution_type is not None:
1143 self.execution_type = execution_type
1145 params = self._params_validation(params)
1146 inputs = self._inputs_validation(inputs)
1147 enc_params = self._enc_params_validation(enc_params)
1148 inputs, params, self.batch_shape = self._assimilate_batch(inputs, params)
1149 # the qasm representation contains the bound parameters,
1150 # thus it is ok to hash that
1151 hs = hashlib.md5(
1152 repr(
1153 {
1154 "n_qubits": self.n_qubits,
1155 "n_layers": self.n_layers,
1156 "pqc": self.pqc.__class__.__name__,
1157 "dru": self.data_reupload,
1158 "params": self.params, # use safe-params
1159 "enc_params": self.enc_params,
1160 "noise_params": self.noise_params,
1161 "execution_type": self.execution_type,
1162 "inputs": inputs,
1163 "output_qubit": self.output_qubit,
1164 }
1165 ).encode("utf-8")
1166 ).hexdigest()
1168 result: Optional[np.ndarray] = None
1169 if cache:
1170 name: str = f"pqc_{hs}.npy"
1172 cache_folder: str = ".cache"
1173 if not os.path.exists(cache_folder):
1174 os.mkdir(cache_folder)
1176 file_path: str = os.path.join(cache_folder, name)
1178 if os.path.isfile(file_path):
1179 result = np.load(file_path)
1181 if result is None:
1182 # if density matrix requested or noise params used
1183 if self._requires_density():
1184 result = self._mp_executor(
1185 f=self.circuit_mixed,
1186 params=params, # use arraybox params
1187 inputs=inputs,
1188 enc_params=enc_params,
1189 )
1190 else:
1191 if not isinstance(self.circuit, qml.QNode):
1192 result = self.circuit(
1193 inputs=inputs,
1194 )
1195 else:
1196 result = self._mp_executor(
1197 f=self.circuit,
1198 params=params, # use arraybox params
1199 inputs=inputs,
1200 enc_params=enc_params,
1201 )
1203 if isinstance(result, list):
1204 result = np.stack(result)
1206 if self.execution_type == "expval" and force_mean and self.output_qubit == -1:
1207 # exception for torch layer because it swaps batch and output dimension
1208 if not isinstance(self.circuit, qml.QNode):
1209 result = result.mean(axis=-1)
1210 else:
1211 result = result.mean(axis=0)
1212 elif self.execution_type == "probs" and force_mean and self.output_qubit == -1:
1213 # exception for torch layer because it swaps batch and output dimension
1214 if not isinstance(self.circuit, qml.QNode):
1215 result = result[..., -1].sum(axis=-1)
1216 else:
1217 result = result[1:, ...].sum(axis=0)
1219 if self.batch_shape[0] > 1 and self.batch_shape[1] > 1:
1220 result = result.reshape(-1, *self.batch_shape)
1222 result = result.squeeze()
1224 if cache:
1225 np.save(file_path, result)
1227 return result