Coverage for qml_essentials/coefficients.py: 96%
391 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-12-22 11:59 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-12-22 11:59 +0000
1from __future__ import annotations
2import math
3from collections import defaultdict
4from dataclasses import dataclass
5import pennylane as qml
6import pennylane.numpy as np
7import numpy as nnp
8from pennylane.operation import Operator
9import pennylane.ops.op_math as qml_op
10from scipy.stats import rankdata
11from functools import reduce
12from typing import List, Tuple, Optional, Any, Dict, Union
14from qml_essentials.model import Model
16import logging
18log = logging.getLogger(__name__)
21class Coefficients:
22 @staticmethod
23 def get_spectrum(
24 model: Model,
25 mfs: int = 1,
26 mts: int = 1,
27 shift=False,
28 trim=False,
29 **kwargs,
30 ) -> Tuple[np.ndarray, np.ndarray]:
31 """
32 Extracts the coefficients of a given model using a FFT (np-fft).
34 Note that the coefficients are complex numbers, but the imaginary part
35 of the coefficients should be very close to zero, since the expectation
36 values of the Pauli operators are real numbers.
38 It can perform oversampling in both the frequency and time domain
39 using the `mfs` and `mts` arguments.
41 Args:
42 model (Model): The model to sample.
43 mfs (int): Multiplicator for the highest frequency. Default is 2.
44 mts (int): Multiplicator for the number of time samples. Default is 1.
45 shift (bool): Whether to apply np-fftshift. Default is False.
46 trim (bool): Whether to remove the Nyquist frequency if spectrum is even.
47 Default is False.
48 kwargs (Any): Additional keyword arguments for the model function.
50 Returns:
51 Tuple[np.ndarray, np.ndarray]: Tuple containing the coefficients
52 and frequencies.
53 """
54 kwargs.setdefault("force_mean", True)
55 kwargs.setdefault("execution_type", "expval")
57 coeffs, freqs = Coefficients._fourier_transform(
58 model, mfs=mfs, mts=mts, **kwargs
59 )
61 if not np.isclose(np.sum(coeffs).imag, 0.0, rtol=1.0e-5):
62 raise ValueError(
63 f"Spectrum is not real. Imaginary part of coefficients is:\
64 {np.sum(coeffs).imag}"
65 )
67 if trim:
68 for ax in range(model.n_input_feat):
69 if coeffs.shape[ax] % 2 == 0:
70 coeffs = np.delete(coeffs, len(coeffs) // 2, axis=ax)
71 freqs = np.delete(freqs, len(freqs) // 2, axis=ax)
73 if shift:
74 coeffs = np.fft.fftshift(coeffs, axes=list(range(model.n_input_feat)))
75 freqs = np.fft.fftshift(freqs)
77 if len(freqs) == 1:
78 freqs = freqs[0]
80 return coeffs, freqs
82 @staticmethod
83 def _fourier_transform(
84 model: Model, mfs: int, mts: int, **kwargs: Any
85 ) -> np.ndarray:
86 # Create a frequency vector with as many frequencies as model degrees,
87 # oversampled by nfs
88 n_freqs: np.ndarray = np.array(
89 [2 * mfs * model.frequencies[i] + 1 for i in range(model.n_input_feat)]
90 )
92 start, stop, step = 0, 2 * mts * np.pi, 2 * np.pi / n_freqs
93 # Stretch according to the number of frequencies
94 inputs: List = [
95 np.arange(start, stop, step[i]) for i in range(model.n_input_feat)
96 ]
98 # permute with input dimensionality
99 nd_inputs = np.array(
100 np.meshgrid(*[inputs[i] for i in range(model.n_input_feat)])
101 ).T.reshape(-1, model.n_input_feat)
103 # Output vector is not necessarily the same length as input
104 outputs = model(inputs=nd_inputs, **kwargs)
105 outputs = outputs.reshape(
106 *[inputs[i].shape[0] for i in range(model.n_input_feat)], -1
107 ).squeeze()
109 coeffs = np.fft.fftn(outputs, axes=list(range(model.n_input_feat)))
111 # TODO: in the future, this should take into account that there can be a
112 # different number of frequencies per dimension
113 freqs = [
114 np.fft.fftfreq(int(mts * n_freqs[i]), 1 / n_freqs[i])
115 for i in range(model.n_input_feat)
116 ]
117 # freqs = np.fft.fftfreq(mts * n_freqs, 1 / n_freqs)
119 # TODO: this could cause issues with multidim input
120 # FIXME: account for different frequencies in multidim input scenarios
121 # Run the fft and rearrange +
122 # normalize the output (using product if multidim)
123 return (
124 coeffs / np.prod(outputs.shape[0 : model.n_input_feat]),
125 freqs,
126 )
128 @staticmethod
129 def get_psd(coeffs: np.ndarray) -> np.ndarray:
130 """
131 Calculates the power spectral density (PSD) from given Fourier coefficients.
133 Args:
134 coeffs (np.ndarray): The Fourier coefficients.
136 Returns:
137 np.ndarray: The power spectral density.
138 """
139 # TODO: if we apply trim=True in advance, this will be slightly wrong..
141 def abs2(x):
142 return x.real**2 + x.imag**2
144 scale = 2.0 / (len(coeffs) ** 2)
145 return scale * abs2(coeffs)
147 @staticmethod
148 def evaluate_Fourier_series(
149 coefficients: np.ndarray,
150 frequencies: np.ndarray,
151 inputs: Union[np.ndarray, list, float],
152 ) -> float:
153 """
154 Evaluate the function value of a Fourier series at one point.
156 Args:
157 coefficients (np.ndarray): Coefficients of the Fourier series.
158 frequencies (np.ndarray): Corresponding frequencies.
159 inputs (np.ndarray): Point at which to evaluate the function.
160 Returns:
161 float: The function value at the input point.
162 """
163 dims = len(coefficients.shape)
165 if not isinstance(inputs, (np.ndarray, list)):
166 inputs = [inputs]
168 frequencies = np.stack(np.meshgrid(*frequencies)).T.reshape(-1, dims)
169 freq_inputs = np.einsum("...j,j->...", frequencies, inputs)
170 coeffs = coefficients.flatten()
171 freq_inputs = freq_inputs.flatten()
173 exp = 0.0
174 for omega_x, c in zip(freq_inputs, coeffs):
175 exp += c * np.exp(1j * omega_x)
177 return np.real_if_close(exp)
180class FourierTree:
181 """
182 Sine-cosine tree representation for the algorithm by Nemkov et al.
183 This tree can be used to obtain analytical Fourier coefficients for a given
184 Pauli-Clifford circuit.
185 """
187 class CoefficientsTreeNode:
188 """
189 Representation of a node in the coefficients tree for the algorithm by
190 Nemkov et al.
191 """
193 def __init__(
194 self,
195 parameter_idx: Optional[int],
196 observable: FourierTree.PauliOperator,
197 is_sine_factor: bool,
198 is_cosine_factor: bool,
199 left: Optional[FourierTree.CoefficientsTreeNode] = None,
200 right: Optional[FourierTree.CoefficientsTreeNode] = None,
201 ):
202 """
203 Coefficient tree node initialisation. Each node has information about
204 its creation context and it's children, i.e.:
206 Args:
207 parameter_idx (Optional[int]): Index of the corresp. param. index i.
208 observable (FourierTree.PauliOperator): The nodes observable to
209 obtain the expectation value that contributes to the constant
210 term.
211 is_sine_factor (bool): If this node belongs to a sine coefficient.
212 is_cosine_factor (bool): If this node belongs to a cosine coefficient.
213 left (Optional[CoefficientsTreeNode]): left child (if any).
214 right (Optional[CoefficientsTreeNode]): right child (if any).
215 """
216 self.parameter_idx = parameter_idx
218 assert not (
219 is_sine_factor and is_cosine_factor
220 ), "Cannot be sine and cosine at the same time"
221 self.is_sine_factor = is_sine_factor
222 self.is_cosine_factor = is_cosine_factor
224 # If the observable does not constist of only Z and I, the
225 # expectation (and therefore the constant node term) is zero
226 if np.logical_or(
227 observable.list_repr == 0, observable.list_repr == 1
228 ).any():
229 self.term = 0.0
230 else:
231 self.term = observable.phase
233 self.left = left
234 self.right = right
236 def evaluate(self, parameters: list[float]) -> float:
237 """
238 Recursive function to evaluate the expectation of the coefficient tree,
239 starting from the current node.
241 Args:
242 parameters (list[float]): The parameters, by which the circuit (and
243 therefore the tree) is parametrised.
245 Returns:
246 float: The expectation for the current node and it's children.
247 """
248 factor = (
249 parameters[self.parameter_idx]
250 if self.parameter_idx is not None
251 else 1.0
252 )
253 if self.is_sine_factor:
254 factor = 1j * np.sin(factor)
255 elif self.is_cosine_factor:
256 factor = np.cos(factor)
257 if not (self.left or self.right): # leaf
258 return factor * self.term
260 sum_children = 0.0
261 if self.left:
262 left = self.left.evaluate(parameters)
263 sum_children = sum_children + left
264 if self.right:
265 right = self.right.evaluate(parameters)
266 sum_children = sum_children + right
268 return factor * sum_children
270 def get_leafs(
271 self,
272 sin_list: List[int],
273 cos_list: List[int],
274 existing_leafs: List[FourierTree.TreeLeaf] = [],
275 ) -> List[FourierTree.TreeLeaf]:
276 """
277 Traverse the tree starting from the current node, to obtain the tree
278 leafs only.
279 The leafs correspond to the terms in the sine-cosine tree
280 representation that eventually are used to obtain coefficients and
281 frequencies.
282 Sine and cosine lists are recursively passed to the children until a
283 leaf is reached (top to bottom).
284 Leafs are then passed bottom to top to the caller.
286 Args:
287 sin_list (List[int]): Current number of sine contributions for each
288 parameter. Has the same length as the parameters, as each
289 position corresponds to one parameter.
290 cos_list (List[int]): Current number of cosine contributions for
291 each parameter. Has the same length as the parameters, as each
292 position corresponds to one parameter.
293 existing_leafs (List[TreeLeaf]): Current list of leaf nodes from
294 parents.
296 Returns:
297 List[TreeLeaf]: Updated list of leaf nodes.
298 """
300 if self.is_sine_factor:
301 sin_list[self.parameter_idx] += 1
302 if self.is_cosine_factor:
303 cos_list[self.parameter_idx] += 1
305 if not (self.left or self.right): # leaf
306 if self.term != 0.0:
307 return [FourierTree.TreeLeaf(sin_list, cos_list, self.term)]
308 else:
309 return []
311 if self.left:
312 leafs_left = self.left.get_leafs(
313 sin_list.copy(), cos_list.copy(), existing_leafs.copy()
314 )
315 else:
316 leafs_left = []
318 if self.right:
319 leafs_right = self.right.get_leafs(
320 sin_list.copy(), cos_list.copy(), existing_leafs.copy()
321 )
322 else:
323 leafs_right = []
325 existing_leafs.extend(leafs_left)
326 existing_leafs.extend(leafs_right)
327 return existing_leafs
329 @dataclass
330 class TreeLeaf:
331 """
332 Coefficient tree leafs according to the algorithm by Nemkov et al., which
333 correspond to the terms in the sine-cosine tree representation that
334 eventually are used to obtain coefficients and frequencies.
336 Args:
337 sin_indices (List[int]): Current number of sine contributions for each
338 parameter. Has the same length as the parameters, as each
339 position corresponds to one parameter.
340 cos_list (List[int]): Current number of cosine contributions for
341 each parameter. Has the same length as the parameters, as each
342 position corresponds to one parameter.
343 term (np.complex): Constant factor of the leaf, depending on the
344 expectation value of the observable, and a phase.
345 """
347 sin_indices: List[int]
348 cos_indices: List[int]
349 term: np.complex128
351 class PauliOperator:
352 """
353 Utility class for storing Pauli Rotations, the corresponding indices
354 in the XY-Space (whether there is a gate with X or Y generator at a
355 certain qubit) and the phase.
357 Args:
358 pauli (Union[Operator, np.ndarray[int]]): Pauli Rotation Operation
359 or list representation
360 n_qubits (int): Number of qubits in the circuit
361 prev_xy_indices (Optional[np.ndarray[bool]]): X/Y indices of the
362 previous Pauli sequence. Defaults to None.
363 is_observable (bool): If the operator is an observable. Defaults to
364 False.
365 is_init (bool): If this Pauli operator is initialised the first
366 time. Defaults to True.
367 phase (float): Phase of the operator. Defaults to 1.0
368 """
370 def __init__(
371 self,
372 pauli: Union[Operator, np.ndarray[int]],
373 n_qubits: int,
374 prev_xy_indices: Optional[np.ndarray[bool]] = None,
375 is_observable: bool = False,
376 is_init: bool = True,
377 phase: float = 1.0,
378 ):
379 self.is_observable = is_observable
380 self.phase = phase
382 if is_init:
383 if not is_observable:
384 pauli = pauli.generator()[0].base
385 self.list_repr = self._create_list_representation(pauli, n_qubits)
386 else:
387 assert isinstance(pauli, np.ndarray)
388 self.list_repr = pauli
390 if prev_xy_indices is None:
391 prev_xy_indices = np.zeros(n_qubits, dtype=bool)
392 self.xy_indices = np.logical_or(
393 prev_xy_indices,
394 self._compute_xy_indices(self.list_repr, rev=is_observable),
395 )
397 @staticmethod
398 def _compute_xy_indices(
399 op: np.ndarray[int], rev: bool = False
400 ) -> np.ndarray[bool]:
401 """
402 Computes the positions of X or Y gates to an one-hot encoded boolen
403 array.
405 Args:
406 op (np.ndarray[int]): Pauli-Operation list representation.
407 rev (bool): Whether to negate the array.
409 Returns:
410 np.ndarray[bool]: One hot encoded boolean array.
411 """
412 xy_indices = (op == 0) + (op == 1)
413 if rev:
414 xy_indices = ~xy_indices
415 return xy_indices
417 @staticmethod
418 def _create_list_representation(op: Operator, n_qubits: int) -> np.ndarray[int]:
419 """
420 Create list representation of a Pennylane Operator.
421 Generally, the list representation is a list of length n_qubits,
422 where at each position a Pauli Operator is encoded as such:
423 I: -1
424 X: 0
425 Y: 1
426 Z: 2
428 Args:
429 op (Operator): Pennylane Operator
430 n_qubits (int): number of qubits in the circuit
432 Returns:
433 np.ndarray[int]: List representation
434 """
435 pauli_repr = -np.ones(n_qubits, dtype=int)
436 op = op.terms()[1][0] if isinstance(op, qml_op.Prod) else op
437 op = op.base if isinstance(op, qml_op.SProd) else op
439 if isinstance(op, qml.PauliX):
440 pauli_repr[op.wires[0]] = 0
441 elif isinstance(op, qml.PauliY):
442 pauli_repr[op.wires[0]] = 1
443 elif isinstance(op, qml.PauliZ):
444 pauli_repr[op.wires[0]] = 2
445 else:
446 for o in op:
447 if isinstance(o, qml.PauliX):
448 pauli_repr[o.wires[0]] = 0
449 elif isinstance(o, qml.PauliY):
450 pauli_repr[o.wires[0]] = 1
451 elif isinstance(o, qml.PauliZ):
452 pauli_repr[o.wires[0]] = 2
453 return pauli_repr
455 def is_commuting(self, pauli: np.ndarray[int]) -> bool:
456 """
457 Computes if this Pauli commutes with another Pauli operator.
458 This computation is based on the fact that The commutator is zero
459 if and only if the number of anticommuting single-qubit Paulis is
460 even.
462 Args:
463 pauli (np.ndarray[int]): List representation of another Pauli
465 Returns:
466 bool: If the current and other Pauli are commuting.
467 """
468 anticommutator = np.where(
469 pauli < 0,
470 False,
471 np.where(
472 self.list_repr < 0,
473 False,
474 np.where(self.list_repr == pauli, False, True),
475 ),
476 )
477 return not (np.sum(anticommutator) % 2)
479 def tensor(self, pauli: np.ndarray[int]) -> FourierTree.PauliOperator:
480 """
481 Compute tensor product between the current Pauli and a given list
482 representation of another Pauli operator.
484 Args:
485 pauli (np.ndarray[int]): List representation of Pauli
487 Returns:
488 FourierTree.PauliOperator: New Pauli operator object, which
489 contains the tensor product
490 """
491 diff = (pauli - self.list_repr + 3) % 3
492 phase = np.where(
493 self.list_repr < 0,
494 1.0,
495 np.where(
496 pauli < 0,
497 1.0,
498 np.where(
499 diff == 2,
500 1.0j,
501 np.where(diff == 1, -1.0j, 1.0),
502 ),
503 ),
504 )
506 obs = np.where(
507 self.list_repr < 0,
508 pauli,
509 np.where(
510 pauli < 0,
511 self.list_repr,
512 np.where(
513 diff == 2,
514 (self.list_repr + 1) % 3,
515 np.where(diff == 1, (self.list_repr + 2) % 3, -1),
516 ),
517 ),
518 )
519 phase = self.phase * np.prod(phase)
520 return FourierTree.PauliOperator(
521 obs, phase=phase, n_qubits=obs.size, is_init=False, is_observable=True
522 )
524 def __init__(self, model: Model, inputs=1.0):
525 """
526 Tree initialisation, based on the Pauli-Clifford representation of a model.
527 Currently, only one input feature is supported.
529 **Usage**:
530 ```
531 # initialise a model
532 model = Model(...)
534 # initialise and build FourierTree
535 tree = FourierTree(model)
537 # get expectaion value
538 exp = tree()
540 # Get spectrum (for each observable, we have one list element)
541 coeff_list, freq_list = tree.spectrum()
542 ```
544 Args:
545 model (Model): The Model, for which to build the tree
546 inputs (bool, optional): Possible default inputs. Defaults to 1.0.
547 """
548 self.model = model
549 self.tree_roots = None
551 if not model.as_pauli_circuit:
552 model.as_pauli_circuit = True
554 inputs = (
555 self.model._inputs_validation(inputs)
556 if inputs is not None
557 else self.model._inputs_validation(1.0)
558 )
559 inputs.requires_grad = False
561 quantum_tape = qml.workflow.construct_tape(self.model.circuit)(
562 params=model.params, inputs=inputs
563 )
564 self.parameters = [np.squeeze(p) for p in quantum_tape.get_parameters()]
565 self.input_indices = [
566 i for (i, p) in enumerate(self.parameters) if not p.requires_grad
567 ]
569 self.observables = self._encode_observables(quantum_tape.observables)
571 pauli_rot = FourierTree.PauliOperator(
572 quantum_tape.operations[0],
573 self.model.n_qubits,
574 )
575 self.pauli_rotations = [pauli_rot]
576 for op in quantum_tape.operations[1:]:
577 pauli_rot = FourierTree.PauliOperator(
578 op, self.model.n_qubits, pauli_rot.xy_indices
579 )
580 self.pauli_rotations.append(pauli_rot)
582 self.tree_roots = self.build()
583 self.leafs: List[List[FourierTree.TreeLeaf]] = self._get_tree_leafs()
585 def __call__(
586 self,
587 params: Optional[np.ndarray] = None,
588 inputs: Optional[np.ndarray] = None,
589 **kwargs,
590 ) -> np.ndarray:
591 """
592 Evaluates the Fourier tree via sine-cosine terms sum. This is
593 equivalent to computing the expectation value of the observables with
594 respect to the corresponding circuit.
596 Args:
597 params (Optional[np.ndarray], optional): Parameters of the model.
598 Defaults to None.
599 inputs (Optional[np.ndarray], optional): Inputs to the circuit.
600 Defaults to None.
602 Returns:
603 np.ndarray: Expectation value of the tree.
605 Raises:
606 NotImplementedError: When using other "execution_type" as expval.
607 NotImplementedError: When using "noise_params"
610 """
611 params = (
612 self.model._params_validation(params)
613 if params is not None
614 else self.model.params
615 )
616 inputs = (
617 self.model._inputs_validation(inputs)
618 if inputs is not None
619 else self.model._inputs_validation(1.0)
620 )
621 inputs.requires_grad = False
623 if kwargs.get("execution_type", "expval") != "expval":
624 raise NotImplementedError(
625 f'Currently, only "expval" execution type is supported when '
626 f"building FourierTree. Got {kwargs.get('execution_type', 'expval')}."
627 )
628 if kwargs.get("noise_params", None) is not None:
629 raise NotImplementedError(
630 "Currently, noise is not supported when building FourierTree."
631 )
633 quantum_tape = qml.workflow.construct_tape(self.model.circuit)(
634 params=self.model.params, inputs=inputs
635 )
636 self.parameters = [np.squeeze(p) for p in quantum_tape.get_parameters()]
637 self.input_indices = [
638 i for (i, p) in enumerate(self.parameters) if not p.requires_grad
639 ]
641 results = np.zeros(len(self.tree_roots))
642 for i, root in enumerate(self.tree_roots):
643 results[i] = np.real_if_close(root.evaluate(self.parameters))
645 if kwargs.get("force_mean", False):
646 return np.mean(results)
647 else:
648 return results
650 def build(self) -> List[CoefficientsTreeNode]:
651 """
652 Creates the coefficient tree, i.e. it creates and initialises the tree
653 nodes.
654 Leafs can be obtained separately in _get_tree_leafs, once the tree is
655 set up.
657 Returns:
658 List[CoefficientsTreeNode]: The list of root nodes (one root for
659 each observable).
660 """
661 tree_roots = []
662 pauli_rotation_idx = len(self.pauli_rotations) - 1
663 for obs in self.observables:
664 root = self._create_tree_node(obs, pauli_rotation_idx)
665 tree_roots.append(root)
666 return tree_roots
668 def _encode_observables(
669 self, tape_obs: List[Operator]
670 ) -> List[FourierTree.PauliOperator]:
671 """
672 Encodes Pennylane observables from tape as FourierTree.PauliOperator
673 utility objects.
675 Args:
676 tape_obs (List[Operator]): Pennylane tape operations
678 Returns:
679 List[FourierTree.PauliOperator]: List of Pauli operators
680 """
681 observables = []
682 for obs in tape_obs:
683 observables.append(
684 FourierTree.PauliOperator(obs, self.model.n_qubits, is_observable=True)
685 )
686 return observables
688 def _get_tree_leafs(self) -> List[List[TreeLeaf]]:
689 """
690 Obtain all Leaf Nodes with its sine- and cosine terms.
692 Returns:
693 List[List[TreeLeaf]]: For each observable (root), the list of leaf
694 nodes.
695 """
696 leafs = []
697 for root in self.tree_roots:
698 sin_list = np.zeros(len(self.parameters), dtype=np.int32)
699 cos_list = np.zeros(len(self.parameters), dtype=np.int32)
700 leafs.append(root.get_leafs(sin_list, cos_list, []))
701 return leafs
703 def get_spectrum(
704 self, force_mean: bool = False
705 ) -> Tuple[List[np.ndarray], List[np.ndarray]]:
706 """
707 Computes the Fourier spectrum for the tree, consisting of the
708 frequencies and its corresponding coefficinets.
709 If the frag force_mean was set in the constructor, the mean coefficient
710 over all observables (roots) are computed.
712 Args:
713 force_mean (bool, optional): Whether to average over multiple
714 observables. Defaults to False.
716 Returns:
717 Tuple[List[np.ndarray], List[np.ndarray]]:
718 - List of frequencies, one list for each observable (root).
719 - List of corresponding coefficents, one list for each
720 observable (root).
721 """
722 parameter_indices = [
723 i for i in range(len(self.parameters)) if i not in self.input_indices
724 ]
726 coeffs = []
727 for leafs in self.leafs:
728 freq_terms = defaultdict(np.complex128)
729 for leaf in leafs:
730 leaf_factor, s, c = self._compute_leaf_factors(leaf, parameter_indices)
732 for a in range(s + 1):
733 for b in range(c + 1):
734 comb = math.comb(s, a) * math.comb(c, b) * (-1) ** (s - a)
735 freq_terms[2 * a + 2 * b - s - c] += comb * leaf_factor
737 coeffs.append(freq_terms)
739 frequencies, coefficients = self._freq_terms_to_coeffs(coeffs, force_mean)
740 return coefficients, frequencies
742 def _freq_terms_to_coeffs(
743 self, coeffs: List[Dict[int, np.ndarray]], force_mean: bool
744 ) -> Tuple[List[np.ndarray], List[np.ndarray]]:
745 """
746 Given a list of dictionaries of the form:
747 [
748 {
749 freq_obs1_1: coeff1,
750 freq_obs1_2: coeff2,
751 ...
752 },
753 {
754 freq_obs2_1: coeff3,
755 freq_obs2_2: coeff4,
756 ...
757 }
758 ...
759 ],
760 Compute two separate lists of frequencies and coefficients.
761 such that:
762 freqs: [
763 [freq_obs1_1, freq_obs1_1, ...],
764 [freq_obs2_1, freq_obs2_1, ...],
765 ...
766 ]
767 coeffs: [
768 [coeff1, coeff2, ...],
769 [coeff3, coeff4, ...],
770 ...
771 ]
773 If force_mean is set length of the resulting frequency and coefficent
774 list is 1.
776 Args:
777 coeffs (List[Dict[int, np.ndarray]]): Frequency->Coefficients
778 dictionary list, one dict for each observable (root).
779 force_mean (bool): Whether to average coefficients over multiple
780 observables.
782 Returns:
783 Tuple[List[np.ndarray], List[np.ndarray]]:
784 - List of frequencies, one list for each observable (root).
785 - List of corresponding coefficents, one list for each
786 observable (root).
787 """
788 frequencies = []
789 coefficients = []
790 if force_mean:
791 all_freqs = sorted(set([f for c in coeffs for f in c.keys()]))
792 coefficients.append(
793 np.array([np.mean([c.get(f, 0.0) for c in coeffs]) for f in all_freqs])
794 )
795 frequencies.append(np.array(all_freqs))
796 else:
797 for freq_terms in coeffs:
798 freq_terms = dict(sorted(freq_terms.items()))
799 frequencies.append(np.array(list(freq_terms.keys())))
800 coefficients.append(np.array(list(freq_terms.values())))
801 return frequencies, coefficients
803 def _compute_leaf_factors(
804 self, leaf: TreeLeaf, parameter_indices: List[int]
805 ) -> Tuple[float, int, int]:
806 """
807 Computes the constant coefficient factor for each leaf.
808 Additionally sine and cosine contributions of the input parameters for
809 this leaf are returned, which are required to obtain the corresponding
810 frequencies.
812 Args:
813 leaf (TreeLeaf): The leaf for which to compute the factor.
814 parameter_indices (List[int]): Variational parameter indices.
816 Returns:
817 Tuple[float, int, int]:
818 - float: the constant factor for the leaf
819 - int: number of sine contributions of the input
820 - int: number of cosine contributions of the input
821 """
822 leaf_factor = 1.0
823 for i in parameter_indices:
824 interm_factor = (
825 np.cos(self.parameters[i]) ** leaf.cos_indices[i]
826 * (1j * np.sin(self.parameters[i])) ** leaf.sin_indices[i]
827 )
828 leaf_factor = leaf_factor * interm_factor
830 # Get number of sine and cosine factors to which the input contributes
831 c = np.sum([leaf.cos_indices[k] for k in self.input_indices], dtype=np.int32)
832 s = np.sum([leaf.sin_indices[k] for k in self.input_indices], dtype=np.int32)
834 leaf_factor = leaf.term * leaf_factor * 0.5 ** (s + c)
836 return leaf_factor, s, c
838 def _early_stopping_possible(
839 self, pauli_rotation_idx: int, observable: FourierTree.PauliOperator
840 ):
841 """
842 Checks if a node for an observable can be discarded as all expecation
843 values that can result through further branching are zero.
844 The method is mentioned in the paper by Nemkov et al.: If the one-hot
845 encoded indices for X/Y operations in the Pauli-rotation generators are
846 a basis for that of the observable, the node must be processed further.
847 If not, it can be discarded.
849 Args:
850 pauli_rotation_idx (int): Index of remaining Pauli rotation gates.
851 Gates itself are attributes of the class.
852 observable (FourierTree.PauliOperator): Current observable
853 """
854 xy_indices_obs = np.logical_or(
855 observable.xy_indices, self.pauli_rotations[pauli_rotation_idx].xy_indices
856 ).all()
858 return not xy_indices_obs
860 def _create_tree_node(
861 self,
862 observable: FourierTree.PauliOperator,
863 pauli_rotation_idx: int,
864 parameter_idx: Optional[int] = None,
865 is_sine: bool = False,
866 is_cosine: bool = False,
867 ) -> Optional[CoefficientsTreeNode]:
868 """
869 Builds the Fourier-Tree according to the algorithm by Nemkov et al.
871 Args:
872 observable (FourierTree.PauliOperator): Current observable
873 pauli_rotation_idx (int): Index of remaining Pauli rotation gates.
874 Gates itself are attributes of the class.
875 parameter_idx (Optional[int]): Index of the current parameter.
876 Parameters itself are attributes of the class.
877 is_sine (bool): If the current node is a sine (left) node.
878 is_cosine (bool): If the current node is a cosine (right) node.
880 Returns:
881 Optional[CoefficientsTreeNode]: The resulting node. Children are set
882 recursively. The top level receives the tree root.
883 """
884 if self._early_stopping_possible(pauli_rotation_idx, observable):
885 return None
887 # remove commuting paulis
888 while pauli_rotation_idx >= 0:
889 last_pauli = self.pauli_rotations[pauli_rotation_idx]
890 if not observable.is_commuting(last_pauli.list_repr):
891 break
892 pauli_rotation_idx -= 1
893 else: # leaf
894 return FourierTree.CoefficientsTreeNode(
895 parameter_idx, observable, is_sine, is_cosine
896 )
898 last_pauli = self.pauli_rotations[pauli_rotation_idx]
900 left = self._create_tree_node(
901 observable,
902 pauli_rotation_idx - 1,
903 pauli_rotation_idx,
904 is_cosine=True,
905 )
907 next_observable = self._create_new_observable(last_pauli.list_repr, observable)
908 right = self._create_tree_node(
909 next_observable,
910 pauli_rotation_idx - 1,
911 pauli_rotation_idx,
912 is_sine=True,
913 )
915 return FourierTree.CoefficientsTreeNode(
916 parameter_idx,
917 observable,
918 is_sine,
919 is_cosine,
920 left,
921 right,
922 )
924 def _create_new_observable(
925 self, pauli: np.ndarray[int], observable: FourierTree.PauliOperator
926 ) -> FourierTree.PauliOperator:
927 """
928 Utility function to obtain the new observable for a tree node, if the
929 last Pauli and the observable do not commute.
931 Args:
932 pauli (np.ndarray[int]): The int array representation of the last
933 Pauli rotation in the operation sequence.
934 observable (FourierTree.PauliOperator): The current observable.
936 Returns:
937 FourierTree.PauliOperator: The new observable.
938 """
939 observable = observable.tensor(pauli)
940 return observable
943class FCC:
944 @staticmethod
945 def get_fcc(
946 model: Model,
947 n_samples: int,
948 seed: int,
949 method: Optional[str] = "pearson",
950 scale: Optional[bool] = False,
951 weight: Optional[bool] = False,
952 trim_redundant: Optional[bool] = True,
953 **kwargs,
954 ) -> float:
955 """
956 Shortcut method to get just the FCC.
957 This includes
958 1. What is done in `get_fourier_fingerprint`:
959 1. Calculating the coefficients (using `n_samples` and `seed`)
960 2. Correlating the result from 1) using `method`
961 3. Weighting the correlation matrix (if `weight` is True)
962 4. Remove redundancies
963 2. What is done in `calculate_fcc`:
964 1. Absolute of the fingerprint
965 2. Average
967 Args:
968 model (Model): The QFM model
969 n_samples (int): Number of samples to calculate average of coefficients
970 seed (int): Seed to initialize random parameters
971 method (Optional[str], optional): Correlation method. Defaults to "pearson".
972 scale (Optional[bool], optional): Whether to scale the number of samples.
973 Defaults to False.
974 weight (Optional[bool], optional): Whether to weight the correlation matrix.
975 Defaults to False.
976 trim_redundant (Optional[bool], optional): Whether to remove redundant
977 correlations. Defaults to False.
978 **kwargs: Additional keyword arguments for the model function.
980 Returns:
981 float: The FCC
982 """
983 fourier_fingerprint, _ = FCC.get_fourier_fingerprint(
984 model,
985 n_samples,
986 seed,
987 method,
988 scale,
989 weight,
990 trim_redundant=trim_redundant,
991 **kwargs,
992 )
994 return FCC.calculate_fcc(fourier_fingerprint)
996 def get_fourier_fingerprint(
997 model: Model,
998 n_samples: int,
999 seed: int,
1000 method: Optional[str] = "pearson",
1001 scale: Optional[bool] = False,
1002 weight: Optional[bool] = False,
1003 trim_redundant: Optional[bool] = True,
1004 **kwargs,
1005 ) -> Tuple[np.ndarray, np.ndarray]:
1006 """
1007 Shortcut method to get just the fourier fingerprint.
1008 This includes
1009 1. Calculating the coefficients (using `n_samples` and `seed`)
1010 2. Correlating the result from 1) using `method`
1011 3. Weighting the correlation matrix (if `weight` is True)
1012 4. Remove redundancies (if `trim_redundant` is True)
1014 Args:
1015 model (Model): The QFM model
1016 n_samples (int): Number of samples to calculate average of coefficients
1017 seed (int): Seed to initialize random parameters
1018 method (Optional[str], optional): Correlation method. Defaults to "pearson".
1019 scale (Optional[bool], optional): Whether to scale the number of samples.
1020 Defaults to False.
1021 weight (Optional[bool], optional): Whether to weight the correlation matrix.
1022 Defaults to False.
1023 trim_redundant (Optional[bool], optional): Whether to remove redundant
1024 correlations. Defaults to True.
1025 **kwargs: Additional keyword arguments for the model function.
1027 Returns:
1028 Tuple[np.ndarray, np.ndarray]: The fourier fingerprint
1029 and the frequency indices
1030 """
1031 _, coeffs, freqs = FCC._calculate_coefficients(
1032 model, n_samples, seed, scale, **kwargs
1033 )
1034 fourier_fingerprint = FCC._correlate(coeffs.transpose(), method=method)
1036 # perform weighting if requested
1037 fourier_fingerprint = (
1038 FCC._weighting(fourier_fingerprint) if weight else fourier_fingerprint
1039 )
1041 if trim_redundant:
1042 mask = FCC._calculate_mask(freqs)
1044 # apply the mask on the fingerprint
1045 fourier_fingerprint = mask * fourier_fingerprint
1047 row_mask = np.any(np.isfinite(fourier_fingerprint), axis=1)
1048 col_mask = np.any(np.isfinite(fourier_fingerprint), axis=0)
1050 fourier_fingerprint = fourier_fingerprint[row_mask][:, col_mask]
1052 return fourier_fingerprint, freqs
1054 @staticmethod
1055 def calculate_fcc(
1056 fourier_fingerprint: np.ndarray,
1057 ) -> float:
1058 """
1059 Method to calculate the FCC based on an existing correlation matrix.
1060 Calculate absolute and then the average over this matrix.
1061 The Fingerprint can be obtained via `get_fourier_fingerprint`
1063 Args:
1064 coeff_coeff_correlation (np.ndarray): Correlation matrix of coefficients
1065 Returns:
1066 float: The FCC
1067 """
1068 # apply the mask on the fingerprint
1069 return np.nanmean(np.abs(fourier_fingerprint))
1071 def _calculate_mask(freqs: np.ndarray) -> np.ndarray:
1072 """
1073 Method to calculate a mask filtering out redundant elements
1074 of the Fourier correlation matrix, based on the provided frequency vector.
1075 It does so by 'simulating' the operations that would be performed
1076 by `_correlate`.
1078 Args:
1079 freqs (np.ndarray): Array of frequencies
1081 Returns:
1082 np.ndarray: The mask
1083 """
1084 # TODO: this part can be heavily optimized, by e.g. using a "positive_only"
1085 # flag when calculating the coefficients.
1086 # However this would change the numerical values
1087 # (while the order should be still the same).
1089 # disregard all the negativ frequencies
1090 freqs[freqs < 0] = np.nan
1091 # compute the outer product of the frequency vectors for arbitrary dimensions
1092 # or just use the existing frequency vector if it is 1D
1093 nd_freqs = (
1094 reduce(np.multiply, np.ix_(*freqs)) if len(freqs.shape) > 1 else freqs
1095 )
1096 # TODO: could prevent this if we're not using .squeeze()..
1098 # "simulate" what would happen on correlating the coefficients
1099 corr_freqs = np.outer(nd_freqs, nd_freqs)
1100 # mask all frequencies that are nan now
1101 # (i.e. all correlations with a negative frequency component)
1102 corr_mask = np.where(np.isnan(corr_freqs), corr_freqs, 1)
1103 # from this, disregard all the other redundant correlations (i.e. c_0_1 = c_1_0)
1104 corr_mask[np.triu_indices(corr_mask.shape[0], 0)] = np.nan
1106 return corr_mask
1108 @staticmethod
1109 def _calculate_coefficients(
1110 model: Model,
1111 n_samples: int,
1112 seed: int,
1113 scale: bool = False,
1114 **kwargs,
1115 ) -> Tuple[np.ndarray, np.ndarray]:
1116 """
1117 Calculates the Fourier coefficients of a given model
1118 using `n_samples` and `seed`.
1119 Optionally, `noise_params` can be passed to perform noisy simulation.
1121 Args:
1122 model (Model): The QFM model
1123 n_samples (int): Number of samples to calculate average of coefficients
1124 seed (int): Seed to initialize random parameters
1125 scale (bool, optional): Whether to scale the number of samples.
1126 Defaults to False.
1127 **kwargs: Additional keyword arguments for the model function.
1129 Returns:
1130 Tuple[np.ndarray, np.ndarray]: Parameters and Coefficients of size NxK
1131 """
1132 if n_samples > 0:
1133 if scale:
1134 total_samples = int(
1135 np.power(2, model.n_qubits) * n_samples * model.n_input_feat
1136 )
1137 log.info(f"Using {total_samples} samples.")
1138 else:
1139 total_samples = n_samples
1140 rng = np.random.default_rng(seed)
1141 model.initialize_params(rng=rng, repeat=total_samples)
1142 else:
1143 total_samples = 1
1145 coeffs, freqs = Coefficients.get_spectrum(
1146 model, shift=True, trim=True, **kwargs
1147 )
1149 return model.params, coeffs, freqs
1151 @staticmethod
1152 def _correlate(mat: np.ndarray, method: str = "pearson") -> np.ndarray:
1153 """
1154 Correlates two arrays using `method`.
1155 Currently, `pearson` and `spearman` are supported.
1157 Args:
1158 mat (np.ndarray): Array of shape (N, K)
1159 method (str, optional): Correlation method. Defaults to "pearson".
1161 Raises:
1162 ValueError: If the method is not supported.
1164 Returns:
1165 np.ndarray: Correlation matrix of `a` and `b`.
1166 """
1167 assert len(mat.shape) >= 2, "Input matrix must have at least 2 dimensions"
1169 # Note that for the general n-D case, we have to flatten along
1170 # the first axis (last one is batch).
1171 # Note that the order here is important so we can easily filter out
1172 # negative coefficients later.
1173 # Consider the following example: [[1,2,3],[4,5,6],[7,8,9]]
1174 # we want to get [1, 4, 7, 2, 5, 8, 3, 6, 9]
1175 # such that after correlation, all positive indexed coefficients
1176 # will be in the bottom right quadrant
1177 if method == "pearson":
1178 result = FCC._pearson(mat.reshape(mat.shape[0], -1))
1179 # result = FCC._pearson(mat.reshape(mat.shape[-1], -1, order="F"))
1180 elif method == "spearman":
1181 result = FCC._spearman(mat.reshape(mat.shape[0], -1))
1182 # result = FCC._spearman(mat.reshape(mat.shape[-1], -1, order="F"))
1183 else:
1184 raise ValueError(
1185 f"Unknown correlation method: {method}. \
1186 Must be 'pearson' or 'spearman'."
1187 )
1189 return result
1191 @staticmethod
1192 def _pearson(
1193 mat: np.ndarray, cov: Optional[bool] = False, minp: Optional[int] = 1
1194 ) -> np.ndarray:
1195 """
1196 Based on Pandas correlation method as implemented here:
1197 https://github.com/pandas-dev/pandas/blob/main/pandas/_libs/algos.pyx
1199 Compute Pearson correlation between columns of `mat`,
1200 permitting missing values (NaN or ±Inf).
1202 Args:
1203 mat : array_like, shape (N, K)
1204 Input data.
1205 minp : int, optional
1206 Minimum number of paired observations required to form a correlation.
1207 If the number of valid pairs for (i, j) is < minp, the result is NaN.
1209 Returns:
1210 corr : ndarray, shape (K, K)
1211 Pearson correlation matrix.
1212 """
1214 mat = np.asarray(mat, dtype=np.float64)
1215 N, K = mat.shape
1217 # pre‐compute finite‐mask
1218 mask = np.isfinite(mat)
1220 # output
1221 result = np.empty((K, K), dtype=np.float64)
1223 # TODO: optimize in future iterations
1224 # loop over column‐pairs
1225 for i in range(K):
1226 for j in range(i + 1):
1227 # find rows where both columns are finite
1228 m = mask[:, i] & mask[:, j]
1229 n = np.count_nonzero(m)
1230 if n < minp:
1231 # too few pairs
1232 value = np.nan
1233 else:
1234 x = mat[m, i]
1235 y = mat[m, j]
1237 # compute means
1238 mean_x = x.mean()
1239 mean_y = y.mean()
1241 # demeaned data
1242 dx = x - mean_x
1243 dy = y - mean_y
1245 # sum of squares and cross‐prod
1246 ssx = np.dot(dx, dx)
1247 ssy = np.dot(dy, dy)
1248 cxy = np.dot(dx, dy)
1250 if cov:
1251 # sample covariance (denominator n−1)
1252 value = cxy / (n - 1) if n > 1 else np.nan
1253 else:
1254 # Pearson r = cov / (σx σy)
1255 denom = np.sqrt(ssx * ssy)
1256 if denom == 0.0:
1257 value = np.nan
1258 else:
1259 value = cxy / denom
1260 # clip numerical drift
1261 if value > 1.0:
1262 value = 1.0
1263 elif value < -1.0:
1264 value = -1.0
1266 result[i, j] = result[j, i] = value
1268 return result
1270 def _spearman(mat: np.ndarray, minp: Optional[int] = 1) -> np.ndarray:
1271 """
1272 Based on Pandas correlation method as implemented here:
1273 https://github.com/pandas-dev/pandas/blob/main/pandas/_libs/algos.pyx
1275 Compute Spearman correlation between columns of `mat`,
1276 permitting missing values (NaN or ±Inf).
1278 Args:
1279 mat : array_like, shape (N, K)
1280 Input data.
1281 minp : int, optional
1282 Minimum number of paired observations required to form a correlation.
1283 If the number of valid pairs for (i, j) is < minp, the result is NaN.
1285 Returns:
1286 corr : ndarray, shape (K, K)
1287 Spearman correlation matrix.
1288 """
1289 N, K = mat.shape
1290 # trivial all-NaN answer if too few rows
1291 if N < minp:
1292 return np.full((K, K), np.nan, dtype=float)
1294 # mask of finite entries
1295 mask = np.isfinite(mat) # shape (N, K), dtype=bool
1297 # precompute ranks column-wise ignoring NaNs
1298 ranks = np.full((N, K), np.nan, dtype=float)
1299 for j in range(K):
1300 valid = mask[:, j]
1301 if valid.any():
1302 # rankdata by default gives average ranks for ties
1303 ranks[valid, j] = rankdata(mat[valid, j], method="average")
1305 # allocate result
1306 result = np.empty((K, K), dtype=float)
1308 # TODO: optimize in future iterations
1309 # loop lower triangle (including diagonal)
1310 for i in range(K):
1311 for j in range(i + 1):
1312 # find rows where both columns are finite
1313 valid = mask[:, i] & mask[:, j]
1314 nobs = valid.sum()
1316 if nobs < minp:
1317 rho = np.nan
1318 else:
1319 xi = ranks[valid, i]
1320 yj = ranks[valid, j]
1321 # subtract means
1322 xi = xi - xi.mean()
1323 yj = yj - yj.mean()
1324 num = np.dot(xi, yj)
1325 den = np.sqrt(np.dot(xi, xi) * np.dot(yj, yj))
1326 rho = num / den if den > 0 else np.nan
1328 result[i, j] = rho
1329 result[j, i] = rho
1331 return result
1333 @staticmethod
1334 def _weighting(fourier_fingerprint: np.ndarray) -> np.ndarray:
1335 """
1336 Performs weighting on the given correlation matrix.
1337 Here, low-frequent coefficients are weighted more heavily.
1339 Args:
1340 correlation (np.ndarray): Correlation matrix
1341 """
1342 # TODO: in Future iterations, this can be optimized by computing
1343 # on the trimmed matrix instead.
1345 assert (
1346 fourier_fingerprint.shape[0] % 2 != 0
1347 and fourier_fingerprint.shape[1] % 2 != 0
1348 ), "Correlation matrix must have odd dimensions. \
1349 Hint: use `trim` argument when calling `get_spectrum`."
1350 assert (
1351 fourier_fingerprint.shape[0] == fourier_fingerprint.shape[1]
1352 ), "Correlation matrix must be square."
1354 def quadrant_to_matrix(a: np.ndarray) -> np.ndarray:
1355 """
1356 Transforms [[1,2],[3,4]] to
1357 [[1,2,1],[3,4,3],[1,2,1]]
1359 Args:
1360 a (np.ndarray): _description_
1362 Returns:
1363 np.ndarray: _description_
1364 """
1365 # rotates a from [[1,2],[3,4]] to [[3,4],[1,2]]
1366 a_rot = np.rot90(a)
1367 # merge the two matrices
1368 left = np.concat([a, a_rot])
1369 # merges left and right (left flipped)
1370 b = np.concat(
1371 [left, np.flip(left)],
1372 axis=1,
1373 )
1374 # remove the middle column and row
1375 return np.delete(
1376 np.delete(b, (b.shape[0] // 2), axis=0), (b.shape[1] // 2), axis=1
1377 )
1379 nc = fourier_fingerprint.shape[0] // 2 + 1
1380 weights = nnp.mgrid[0:nc:1, 0:nc:1].sum(axis=0) / ((nc - 1) * 2)
1381 weights_matrix = quadrant_to_matrix(weights)
1383 return fourier_fingerprint * weights_matrix
1384 raise NotImplementedError("Weighting method is not implemented")