Coverage for qml_essentials/coefficients.py: 96%

391 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-12-22 11:59 +0000

1from __future__ import annotations 

2import math 

3from collections import defaultdict 

4from dataclasses import dataclass 

5import pennylane as qml 

6import pennylane.numpy as np 

7import numpy as nnp 

8from pennylane.operation import Operator 

9import pennylane.ops.op_math as qml_op 

10from scipy.stats import rankdata 

11from functools import reduce 

12from typing import List, Tuple, Optional, Any, Dict, Union 

13 

14from qml_essentials.model import Model 

15 

16import logging 

17 

18log = logging.getLogger(__name__) 

19 

20 

21class Coefficients: 

22 @staticmethod 

23 def get_spectrum( 

24 model: Model, 

25 mfs: int = 1, 

26 mts: int = 1, 

27 shift=False, 

28 trim=False, 

29 **kwargs, 

30 ) -> Tuple[np.ndarray, np.ndarray]: 

31 """ 

32 Extracts the coefficients of a given model using a FFT (np-fft). 

33 

34 Note that the coefficients are complex numbers, but the imaginary part 

35 of the coefficients should be very close to zero, since the expectation 

36 values of the Pauli operators are real numbers. 

37 

38 It can perform oversampling in both the frequency and time domain 

39 using the `mfs` and `mts` arguments. 

40 

41 Args: 

42 model (Model): The model to sample. 

43 mfs (int): Multiplicator for the highest frequency. Default is 2. 

44 mts (int): Multiplicator for the number of time samples. Default is 1. 

45 shift (bool): Whether to apply np-fftshift. Default is False. 

46 trim (bool): Whether to remove the Nyquist frequency if spectrum is even. 

47 Default is False. 

48 kwargs (Any): Additional keyword arguments for the model function. 

49 

50 Returns: 

51 Tuple[np.ndarray, np.ndarray]: Tuple containing the coefficients 

52 and frequencies. 

53 """ 

54 kwargs.setdefault("force_mean", True) 

55 kwargs.setdefault("execution_type", "expval") 

56 

57 coeffs, freqs = Coefficients._fourier_transform( 

58 model, mfs=mfs, mts=mts, **kwargs 

59 ) 

60 

61 if not np.isclose(np.sum(coeffs).imag, 0.0, rtol=1.0e-5): 

62 raise ValueError( 

63 f"Spectrum is not real. Imaginary part of coefficients is:\ 

64 {np.sum(coeffs).imag}" 

65 ) 

66 

67 if trim: 

68 for ax in range(model.n_input_feat): 

69 if coeffs.shape[ax] % 2 == 0: 

70 coeffs = np.delete(coeffs, len(coeffs) // 2, axis=ax) 

71 freqs = np.delete(freqs, len(freqs) // 2, axis=ax) 

72 

73 if shift: 

74 coeffs = np.fft.fftshift(coeffs, axes=list(range(model.n_input_feat))) 

75 freqs = np.fft.fftshift(freqs) 

76 

77 if len(freqs) == 1: 

78 freqs = freqs[0] 

79 

80 return coeffs, freqs 

81 

82 @staticmethod 

83 def _fourier_transform( 

84 model: Model, mfs: int, mts: int, **kwargs: Any 

85 ) -> np.ndarray: 

86 # Create a frequency vector with as many frequencies as model degrees, 

87 # oversampled by nfs 

88 n_freqs: np.ndarray = np.array( 

89 [2 * mfs * model.frequencies[i] + 1 for i in range(model.n_input_feat)] 

90 ) 

91 

92 start, stop, step = 0, 2 * mts * np.pi, 2 * np.pi / n_freqs 

93 # Stretch according to the number of frequencies 

94 inputs: List = [ 

95 np.arange(start, stop, step[i]) for i in range(model.n_input_feat) 

96 ] 

97 

98 # permute with input dimensionality 

99 nd_inputs = np.array( 

100 np.meshgrid(*[inputs[i] for i in range(model.n_input_feat)]) 

101 ).T.reshape(-1, model.n_input_feat) 

102 

103 # Output vector is not necessarily the same length as input 

104 outputs = model(inputs=nd_inputs, **kwargs) 

105 outputs = outputs.reshape( 

106 *[inputs[i].shape[0] for i in range(model.n_input_feat)], -1 

107 ).squeeze() 

108 

109 coeffs = np.fft.fftn(outputs, axes=list(range(model.n_input_feat))) 

110 

111 # TODO: in the future, this should take into account that there can be a 

112 # different number of frequencies per dimension 

113 freqs = [ 

114 np.fft.fftfreq(int(mts * n_freqs[i]), 1 / n_freqs[i]) 

115 for i in range(model.n_input_feat) 

116 ] 

117 # freqs = np.fft.fftfreq(mts * n_freqs, 1 / n_freqs) 

118 

119 # TODO: this could cause issues with multidim input 

120 # FIXME: account for different frequencies in multidim input scenarios 

121 # Run the fft and rearrange + 

122 # normalize the output (using product if multidim) 

123 return ( 

124 coeffs / np.prod(outputs.shape[0 : model.n_input_feat]), 

125 freqs, 

126 ) 

127 

128 @staticmethod 

129 def get_psd(coeffs: np.ndarray) -> np.ndarray: 

130 """ 

131 Calculates the power spectral density (PSD) from given Fourier coefficients. 

132 

133 Args: 

134 coeffs (np.ndarray): The Fourier coefficients. 

135 

136 Returns: 

137 np.ndarray: The power spectral density. 

138 """ 

139 # TODO: if we apply trim=True in advance, this will be slightly wrong.. 

140 

141 def abs2(x): 

142 return x.real**2 + x.imag**2 

143 

144 scale = 2.0 / (len(coeffs) ** 2) 

145 return scale * abs2(coeffs) 

146 

147 @staticmethod 

148 def evaluate_Fourier_series( 

149 coefficients: np.ndarray, 

150 frequencies: np.ndarray, 

151 inputs: Union[np.ndarray, list, float], 

152 ) -> float: 

153 """ 

154 Evaluate the function value of a Fourier series at one point. 

155 

156 Args: 

157 coefficients (np.ndarray): Coefficients of the Fourier series. 

158 frequencies (np.ndarray): Corresponding frequencies. 

159 inputs (np.ndarray): Point at which to evaluate the function. 

160 Returns: 

161 float: The function value at the input point. 

162 """ 

163 dims = len(coefficients.shape) 

164 

165 if not isinstance(inputs, (np.ndarray, list)): 

166 inputs = [inputs] 

167 

168 frequencies = np.stack(np.meshgrid(*frequencies)).T.reshape(-1, dims) 

169 freq_inputs = np.einsum("...j,j->...", frequencies, inputs) 

170 coeffs = coefficients.flatten() 

171 freq_inputs = freq_inputs.flatten() 

172 

173 exp = 0.0 

174 for omega_x, c in zip(freq_inputs, coeffs): 

175 exp += c * np.exp(1j * omega_x) 

176 

177 return np.real_if_close(exp) 

178 

179 

180class FourierTree: 

181 """ 

182 Sine-cosine tree representation for the algorithm by Nemkov et al. 

183 This tree can be used to obtain analytical Fourier coefficients for a given 

184 Pauli-Clifford circuit. 

185 """ 

186 

187 class CoefficientsTreeNode: 

188 """ 

189 Representation of a node in the coefficients tree for the algorithm by 

190 Nemkov et al. 

191 """ 

192 

193 def __init__( 

194 self, 

195 parameter_idx: Optional[int], 

196 observable: FourierTree.PauliOperator, 

197 is_sine_factor: bool, 

198 is_cosine_factor: bool, 

199 left: Optional[FourierTree.CoefficientsTreeNode] = None, 

200 right: Optional[FourierTree.CoefficientsTreeNode] = None, 

201 ): 

202 """ 

203 Coefficient tree node initialisation. Each node has information about 

204 its creation context and it's children, i.e.: 

205 

206 Args: 

207 parameter_idx (Optional[int]): Index of the corresp. param. index i. 

208 observable (FourierTree.PauliOperator): The nodes observable to 

209 obtain the expectation value that contributes to the constant 

210 term. 

211 is_sine_factor (bool): If this node belongs to a sine coefficient. 

212 is_cosine_factor (bool): If this node belongs to a cosine coefficient. 

213 left (Optional[CoefficientsTreeNode]): left child (if any). 

214 right (Optional[CoefficientsTreeNode]): right child (if any). 

215 """ 

216 self.parameter_idx = parameter_idx 

217 

218 assert not ( 

219 is_sine_factor and is_cosine_factor 

220 ), "Cannot be sine and cosine at the same time" 

221 self.is_sine_factor = is_sine_factor 

222 self.is_cosine_factor = is_cosine_factor 

223 

224 # If the observable does not constist of only Z and I, the 

225 # expectation (and therefore the constant node term) is zero 

226 if np.logical_or( 

227 observable.list_repr == 0, observable.list_repr == 1 

228 ).any(): 

229 self.term = 0.0 

230 else: 

231 self.term = observable.phase 

232 

233 self.left = left 

234 self.right = right 

235 

236 def evaluate(self, parameters: list[float]) -> float: 

237 """ 

238 Recursive function to evaluate the expectation of the coefficient tree, 

239 starting from the current node. 

240 

241 Args: 

242 parameters (list[float]): The parameters, by which the circuit (and 

243 therefore the tree) is parametrised. 

244 

245 Returns: 

246 float: The expectation for the current node and it's children. 

247 """ 

248 factor = ( 

249 parameters[self.parameter_idx] 

250 if self.parameter_idx is not None 

251 else 1.0 

252 ) 

253 if self.is_sine_factor: 

254 factor = 1j * np.sin(factor) 

255 elif self.is_cosine_factor: 

256 factor = np.cos(factor) 

257 if not (self.left or self.right): # leaf 

258 return factor * self.term 

259 

260 sum_children = 0.0 

261 if self.left: 

262 left = self.left.evaluate(parameters) 

263 sum_children = sum_children + left 

264 if self.right: 

265 right = self.right.evaluate(parameters) 

266 sum_children = sum_children + right 

267 

268 return factor * sum_children 

269 

270 def get_leafs( 

271 self, 

272 sin_list: List[int], 

273 cos_list: List[int], 

274 existing_leafs: List[FourierTree.TreeLeaf] = [], 

275 ) -> List[FourierTree.TreeLeaf]: 

276 """ 

277 Traverse the tree starting from the current node, to obtain the tree 

278 leafs only. 

279 The leafs correspond to the terms in the sine-cosine tree 

280 representation that eventually are used to obtain coefficients and 

281 frequencies. 

282 Sine and cosine lists are recursively passed to the children until a 

283 leaf is reached (top to bottom). 

284 Leafs are then passed bottom to top to the caller. 

285 

286 Args: 

287 sin_list (List[int]): Current number of sine contributions for each 

288 parameter. Has the same length as the parameters, as each 

289 position corresponds to one parameter. 

290 cos_list (List[int]): Current number of cosine contributions for 

291 each parameter. Has the same length as the parameters, as each 

292 position corresponds to one parameter. 

293 existing_leafs (List[TreeLeaf]): Current list of leaf nodes from 

294 parents. 

295 

296 Returns: 

297 List[TreeLeaf]: Updated list of leaf nodes. 

298 """ 

299 

300 if self.is_sine_factor: 

301 sin_list[self.parameter_idx] += 1 

302 if self.is_cosine_factor: 

303 cos_list[self.parameter_idx] += 1 

304 

305 if not (self.left or self.right): # leaf 

306 if self.term != 0.0: 

307 return [FourierTree.TreeLeaf(sin_list, cos_list, self.term)] 

308 else: 

309 return [] 

310 

311 if self.left: 

312 leafs_left = self.left.get_leafs( 

313 sin_list.copy(), cos_list.copy(), existing_leafs.copy() 

314 ) 

315 else: 

316 leafs_left = [] 

317 

318 if self.right: 

319 leafs_right = self.right.get_leafs( 

320 sin_list.copy(), cos_list.copy(), existing_leafs.copy() 

321 ) 

322 else: 

323 leafs_right = [] 

324 

325 existing_leafs.extend(leafs_left) 

326 existing_leafs.extend(leafs_right) 

327 return existing_leafs 

328 

329 @dataclass 

330 class TreeLeaf: 

331 """ 

332 Coefficient tree leafs according to the algorithm by Nemkov et al., which 

333 correspond to the terms in the sine-cosine tree representation that 

334 eventually are used to obtain coefficients and frequencies. 

335 

336 Args: 

337 sin_indices (List[int]): Current number of sine contributions for each 

338 parameter. Has the same length as the parameters, as each 

339 position corresponds to one parameter. 

340 cos_list (List[int]): Current number of cosine contributions for 

341 each parameter. Has the same length as the parameters, as each 

342 position corresponds to one parameter. 

343 term (np.complex): Constant factor of the leaf, depending on the 

344 expectation value of the observable, and a phase. 

345 """ 

346 

347 sin_indices: List[int] 

348 cos_indices: List[int] 

349 term: np.complex128 

350 

351 class PauliOperator: 

352 """ 

353 Utility class for storing Pauli Rotations, the corresponding indices 

354 in the XY-Space (whether there is a gate with X or Y generator at a 

355 certain qubit) and the phase. 

356 

357 Args: 

358 pauli (Union[Operator, np.ndarray[int]]): Pauli Rotation Operation 

359 or list representation 

360 n_qubits (int): Number of qubits in the circuit 

361 prev_xy_indices (Optional[np.ndarray[bool]]): X/Y indices of the 

362 previous Pauli sequence. Defaults to None. 

363 is_observable (bool): If the operator is an observable. Defaults to 

364 False. 

365 is_init (bool): If this Pauli operator is initialised the first 

366 time. Defaults to True. 

367 phase (float): Phase of the operator. Defaults to 1.0 

368 """ 

369 

370 def __init__( 

371 self, 

372 pauli: Union[Operator, np.ndarray[int]], 

373 n_qubits: int, 

374 prev_xy_indices: Optional[np.ndarray[bool]] = None, 

375 is_observable: bool = False, 

376 is_init: bool = True, 

377 phase: float = 1.0, 

378 ): 

379 self.is_observable = is_observable 

380 self.phase = phase 

381 

382 if is_init: 

383 if not is_observable: 

384 pauli = pauli.generator()[0].base 

385 self.list_repr = self._create_list_representation(pauli, n_qubits) 

386 else: 

387 assert isinstance(pauli, np.ndarray) 

388 self.list_repr = pauli 

389 

390 if prev_xy_indices is None: 

391 prev_xy_indices = np.zeros(n_qubits, dtype=bool) 

392 self.xy_indices = np.logical_or( 

393 prev_xy_indices, 

394 self._compute_xy_indices(self.list_repr, rev=is_observable), 

395 ) 

396 

397 @staticmethod 

398 def _compute_xy_indices( 

399 op: np.ndarray[int], rev: bool = False 

400 ) -> np.ndarray[bool]: 

401 """ 

402 Computes the positions of X or Y gates to an one-hot encoded boolen 

403 array. 

404 

405 Args: 

406 op (np.ndarray[int]): Pauli-Operation list representation. 

407 rev (bool): Whether to negate the array. 

408 

409 Returns: 

410 np.ndarray[bool]: One hot encoded boolean array. 

411 """ 

412 xy_indices = (op == 0) + (op == 1) 

413 if rev: 

414 xy_indices = ~xy_indices 

415 return xy_indices 

416 

417 @staticmethod 

418 def _create_list_representation(op: Operator, n_qubits: int) -> np.ndarray[int]: 

419 """ 

420 Create list representation of a Pennylane Operator. 

421 Generally, the list representation is a list of length n_qubits, 

422 where at each position a Pauli Operator is encoded as such: 

423 I: -1 

424 X: 0 

425 Y: 1 

426 Z: 2 

427 

428 Args: 

429 op (Operator): Pennylane Operator 

430 n_qubits (int): number of qubits in the circuit 

431 

432 Returns: 

433 np.ndarray[int]: List representation 

434 """ 

435 pauli_repr = -np.ones(n_qubits, dtype=int) 

436 op = op.terms()[1][0] if isinstance(op, qml_op.Prod) else op 

437 op = op.base if isinstance(op, qml_op.SProd) else op 

438 

439 if isinstance(op, qml.PauliX): 

440 pauli_repr[op.wires[0]] = 0 

441 elif isinstance(op, qml.PauliY): 

442 pauli_repr[op.wires[0]] = 1 

443 elif isinstance(op, qml.PauliZ): 

444 pauli_repr[op.wires[0]] = 2 

445 else: 

446 for o in op: 

447 if isinstance(o, qml.PauliX): 

448 pauli_repr[o.wires[0]] = 0 

449 elif isinstance(o, qml.PauliY): 

450 pauli_repr[o.wires[0]] = 1 

451 elif isinstance(o, qml.PauliZ): 

452 pauli_repr[o.wires[0]] = 2 

453 return pauli_repr 

454 

455 def is_commuting(self, pauli: np.ndarray[int]) -> bool: 

456 """ 

457 Computes if this Pauli commutes with another Pauli operator. 

458 This computation is based on the fact that The commutator is zero 

459 if and only if the number of anticommuting single-qubit Paulis is 

460 even. 

461 

462 Args: 

463 pauli (np.ndarray[int]): List representation of another Pauli 

464 

465 Returns: 

466 bool: If the current and other Pauli are commuting. 

467 """ 

468 anticommutator = np.where( 

469 pauli < 0, 

470 False, 

471 np.where( 

472 self.list_repr < 0, 

473 False, 

474 np.where(self.list_repr == pauli, False, True), 

475 ), 

476 ) 

477 return not (np.sum(anticommutator) % 2) 

478 

479 def tensor(self, pauli: np.ndarray[int]) -> FourierTree.PauliOperator: 

480 """ 

481 Compute tensor product between the current Pauli and a given list 

482 representation of another Pauli operator. 

483 

484 Args: 

485 pauli (np.ndarray[int]): List representation of Pauli 

486 

487 Returns: 

488 FourierTree.PauliOperator: New Pauli operator object, which 

489 contains the tensor product 

490 """ 

491 diff = (pauli - self.list_repr + 3) % 3 

492 phase = np.where( 

493 self.list_repr < 0, 

494 1.0, 

495 np.where( 

496 pauli < 0, 

497 1.0, 

498 np.where( 

499 diff == 2, 

500 1.0j, 

501 np.where(diff == 1, -1.0j, 1.0), 

502 ), 

503 ), 

504 ) 

505 

506 obs = np.where( 

507 self.list_repr < 0, 

508 pauli, 

509 np.where( 

510 pauli < 0, 

511 self.list_repr, 

512 np.where( 

513 diff == 2, 

514 (self.list_repr + 1) % 3, 

515 np.where(diff == 1, (self.list_repr + 2) % 3, -1), 

516 ), 

517 ), 

518 ) 

519 phase = self.phase * np.prod(phase) 

520 return FourierTree.PauliOperator( 

521 obs, phase=phase, n_qubits=obs.size, is_init=False, is_observable=True 

522 ) 

523 

524 def __init__(self, model: Model, inputs=1.0): 

525 """ 

526 Tree initialisation, based on the Pauli-Clifford representation of a model. 

527 Currently, only one input feature is supported. 

528 

529 **Usage**: 

530 ``` 

531 # initialise a model 

532 model = Model(...) 

533 

534 # initialise and build FourierTree 

535 tree = FourierTree(model) 

536 

537 # get expectaion value 

538 exp = tree() 

539 

540 # Get spectrum (for each observable, we have one list element) 

541 coeff_list, freq_list = tree.spectrum() 

542 ``` 

543 

544 Args: 

545 model (Model): The Model, for which to build the tree 

546 inputs (bool, optional): Possible default inputs. Defaults to 1.0. 

547 """ 

548 self.model = model 

549 self.tree_roots = None 

550 

551 if not model.as_pauli_circuit: 

552 model.as_pauli_circuit = True 

553 

554 inputs = ( 

555 self.model._inputs_validation(inputs) 

556 if inputs is not None 

557 else self.model._inputs_validation(1.0) 

558 ) 

559 inputs.requires_grad = False 

560 

561 quantum_tape = qml.workflow.construct_tape(self.model.circuit)( 

562 params=model.params, inputs=inputs 

563 ) 

564 self.parameters = [np.squeeze(p) for p in quantum_tape.get_parameters()] 

565 self.input_indices = [ 

566 i for (i, p) in enumerate(self.parameters) if not p.requires_grad 

567 ] 

568 

569 self.observables = self._encode_observables(quantum_tape.observables) 

570 

571 pauli_rot = FourierTree.PauliOperator( 

572 quantum_tape.operations[0], 

573 self.model.n_qubits, 

574 ) 

575 self.pauli_rotations = [pauli_rot] 

576 for op in quantum_tape.operations[1:]: 

577 pauli_rot = FourierTree.PauliOperator( 

578 op, self.model.n_qubits, pauli_rot.xy_indices 

579 ) 

580 self.pauli_rotations.append(pauli_rot) 

581 

582 self.tree_roots = self.build() 

583 self.leafs: List[List[FourierTree.TreeLeaf]] = self._get_tree_leafs() 

584 

585 def __call__( 

586 self, 

587 params: Optional[np.ndarray] = None, 

588 inputs: Optional[np.ndarray] = None, 

589 **kwargs, 

590 ) -> np.ndarray: 

591 """ 

592 Evaluates the Fourier tree via sine-cosine terms sum. This is 

593 equivalent to computing the expectation value of the observables with 

594 respect to the corresponding circuit. 

595 

596 Args: 

597 params (Optional[np.ndarray], optional): Parameters of the model. 

598 Defaults to None. 

599 inputs (Optional[np.ndarray], optional): Inputs to the circuit. 

600 Defaults to None. 

601 

602 Returns: 

603 np.ndarray: Expectation value of the tree. 

604 

605 Raises: 

606 NotImplementedError: When using other "execution_type" as expval. 

607 NotImplementedError: When using "noise_params" 

608 

609 

610 """ 

611 params = ( 

612 self.model._params_validation(params) 

613 if params is not None 

614 else self.model.params 

615 ) 

616 inputs = ( 

617 self.model._inputs_validation(inputs) 

618 if inputs is not None 

619 else self.model._inputs_validation(1.0) 

620 ) 

621 inputs.requires_grad = False 

622 

623 if kwargs.get("execution_type", "expval") != "expval": 

624 raise NotImplementedError( 

625 f'Currently, only "expval" execution type is supported when ' 

626 f"building FourierTree. Got {kwargs.get('execution_type', 'expval')}." 

627 ) 

628 if kwargs.get("noise_params", None) is not None: 

629 raise NotImplementedError( 

630 "Currently, noise is not supported when building FourierTree." 

631 ) 

632 

633 quantum_tape = qml.workflow.construct_tape(self.model.circuit)( 

634 params=self.model.params, inputs=inputs 

635 ) 

636 self.parameters = [np.squeeze(p) for p in quantum_tape.get_parameters()] 

637 self.input_indices = [ 

638 i for (i, p) in enumerate(self.parameters) if not p.requires_grad 

639 ] 

640 

641 results = np.zeros(len(self.tree_roots)) 

642 for i, root in enumerate(self.tree_roots): 

643 results[i] = np.real_if_close(root.evaluate(self.parameters)) 

644 

645 if kwargs.get("force_mean", False): 

646 return np.mean(results) 

647 else: 

648 return results 

649 

650 def build(self) -> List[CoefficientsTreeNode]: 

651 """ 

652 Creates the coefficient tree, i.e. it creates and initialises the tree 

653 nodes. 

654 Leafs can be obtained separately in _get_tree_leafs, once the tree is 

655 set up. 

656 

657 Returns: 

658 List[CoefficientsTreeNode]: The list of root nodes (one root for 

659 each observable). 

660 """ 

661 tree_roots = [] 

662 pauli_rotation_idx = len(self.pauli_rotations) - 1 

663 for obs in self.observables: 

664 root = self._create_tree_node(obs, pauli_rotation_idx) 

665 tree_roots.append(root) 

666 return tree_roots 

667 

668 def _encode_observables( 

669 self, tape_obs: List[Operator] 

670 ) -> List[FourierTree.PauliOperator]: 

671 """ 

672 Encodes Pennylane observables from tape as FourierTree.PauliOperator 

673 utility objects. 

674 

675 Args: 

676 tape_obs (List[Operator]): Pennylane tape operations 

677 

678 Returns: 

679 List[FourierTree.PauliOperator]: List of Pauli operators 

680 """ 

681 observables = [] 

682 for obs in tape_obs: 

683 observables.append( 

684 FourierTree.PauliOperator(obs, self.model.n_qubits, is_observable=True) 

685 ) 

686 return observables 

687 

688 def _get_tree_leafs(self) -> List[List[TreeLeaf]]: 

689 """ 

690 Obtain all Leaf Nodes with its sine- and cosine terms. 

691 

692 Returns: 

693 List[List[TreeLeaf]]: For each observable (root), the list of leaf 

694 nodes. 

695 """ 

696 leafs = [] 

697 for root in self.tree_roots: 

698 sin_list = np.zeros(len(self.parameters), dtype=np.int32) 

699 cos_list = np.zeros(len(self.parameters), dtype=np.int32) 

700 leafs.append(root.get_leafs(sin_list, cos_list, [])) 

701 return leafs 

702 

703 def get_spectrum( 

704 self, force_mean: bool = False 

705 ) -> Tuple[List[np.ndarray], List[np.ndarray]]: 

706 """ 

707 Computes the Fourier spectrum for the tree, consisting of the 

708 frequencies and its corresponding coefficinets. 

709 If the frag force_mean was set in the constructor, the mean coefficient 

710 over all observables (roots) are computed. 

711 

712 Args: 

713 force_mean (bool, optional): Whether to average over multiple 

714 observables. Defaults to False. 

715 

716 Returns: 

717 Tuple[List[np.ndarray], List[np.ndarray]]: 

718 - List of frequencies, one list for each observable (root). 

719 - List of corresponding coefficents, one list for each 

720 observable (root). 

721 """ 

722 parameter_indices = [ 

723 i for i in range(len(self.parameters)) if i not in self.input_indices 

724 ] 

725 

726 coeffs = [] 

727 for leafs in self.leafs: 

728 freq_terms = defaultdict(np.complex128) 

729 for leaf in leafs: 

730 leaf_factor, s, c = self._compute_leaf_factors(leaf, parameter_indices) 

731 

732 for a in range(s + 1): 

733 for b in range(c + 1): 

734 comb = math.comb(s, a) * math.comb(c, b) * (-1) ** (s - a) 

735 freq_terms[2 * a + 2 * b - s - c] += comb * leaf_factor 

736 

737 coeffs.append(freq_terms) 

738 

739 frequencies, coefficients = self._freq_terms_to_coeffs(coeffs, force_mean) 

740 return coefficients, frequencies 

741 

742 def _freq_terms_to_coeffs( 

743 self, coeffs: List[Dict[int, np.ndarray]], force_mean: bool 

744 ) -> Tuple[List[np.ndarray], List[np.ndarray]]: 

745 """ 

746 Given a list of dictionaries of the form: 

747 [ 

748 { 

749 freq_obs1_1: coeff1, 

750 freq_obs1_2: coeff2, 

751 ... 

752 }, 

753 { 

754 freq_obs2_1: coeff3, 

755 freq_obs2_2: coeff4, 

756 ... 

757 } 

758 ... 

759 ], 

760 Compute two separate lists of frequencies and coefficients. 

761 such that: 

762 freqs: [ 

763 [freq_obs1_1, freq_obs1_1, ...], 

764 [freq_obs2_1, freq_obs2_1, ...], 

765 ... 

766 ] 

767 coeffs: [ 

768 [coeff1, coeff2, ...], 

769 [coeff3, coeff4, ...], 

770 ... 

771 ] 

772 

773 If force_mean is set length of the resulting frequency and coefficent 

774 list is 1. 

775 

776 Args: 

777 coeffs (List[Dict[int, np.ndarray]]): Frequency->Coefficients 

778 dictionary list, one dict for each observable (root). 

779 force_mean (bool): Whether to average coefficients over multiple 

780 observables. 

781 

782 Returns: 

783 Tuple[List[np.ndarray], List[np.ndarray]]: 

784 - List of frequencies, one list for each observable (root). 

785 - List of corresponding coefficents, one list for each 

786 observable (root). 

787 """ 

788 frequencies = [] 

789 coefficients = [] 

790 if force_mean: 

791 all_freqs = sorted(set([f for c in coeffs for f in c.keys()])) 

792 coefficients.append( 

793 np.array([np.mean([c.get(f, 0.0) for c in coeffs]) for f in all_freqs]) 

794 ) 

795 frequencies.append(np.array(all_freqs)) 

796 else: 

797 for freq_terms in coeffs: 

798 freq_terms = dict(sorted(freq_terms.items())) 

799 frequencies.append(np.array(list(freq_terms.keys()))) 

800 coefficients.append(np.array(list(freq_terms.values()))) 

801 return frequencies, coefficients 

802 

803 def _compute_leaf_factors( 

804 self, leaf: TreeLeaf, parameter_indices: List[int] 

805 ) -> Tuple[float, int, int]: 

806 """ 

807 Computes the constant coefficient factor for each leaf. 

808 Additionally sine and cosine contributions of the input parameters for 

809 this leaf are returned, which are required to obtain the corresponding 

810 frequencies. 

811 

812 Args: 

813 leaf (TreeLeaf): The leaf for which to compute the factor. 

814 parameter_indices (List[int]): Variational parameter indices. 

815 

816 Returns: 

817 Tuple[float, int, int]: 

818 - float: the constant factor for the leaf 

819 - int: number of sine contributions of the input 

820 - int: number of cosine contributions of the input 

821 """ 

822 leaf_factor = 1.0 

823 for i in parameter_indices: 

824 interm_factor = ( 

825 np.cos(self.parameters[i]) ** leaf.cos_indices[i] 

826 * (1j * np.sin(self.parameters[i])) ** leaf.sin_indices[i] 

827 ) 

828 leaf_factor = leaf_factor * interm_factor 

829 

830 # Get number of sine and cosine factors to which the input contributes 

831 c = np.sum([leaf.cos_indices[k] for k in self.input_indices], dtype=np.int32) 

832 s = np.sum([leaf.sin_indices[k] for k in self.input_indices], dtype=np.int32) 

833 

834 leaf_factor = leaf.term * leaf_factor * 0.5 ** (s + c) 

835 

836 return leaf_factor, s, c 

837 

838 def _early_stopping_possible( 

839 self, pauli_rotation_idx: int, observable: FourierTree.PauliOperator 

840 ): 

841 """ 

842 Checks if a node for an observable can be discarded as all expecation 

843 values that can result through further branching are zero. 

844 The method is mentioned in the paper by Nemkov et al.: If the one-hot 

845 encoded indices for X/Y operations in the Pauli-rotation generators are 

846 a basis for that of the observable, the node must be processed further. 

847 If not, it can be discarded. 

848 

849 Args: 

850 pauli_rotation_idx (int): Index of remaining Pauli rotation gates. 

851 Gates itself are attributes of the class. 

852 observable (FourierTree.PauliOperator): Current observable 

853 """ 

854 xy_indices_obs = np.logical_or( 

855 observable.xy_indices, self.pauli_rotations[pauli_rotation_idx].xy_indices 

856 ).all() 

857 

858 return not xy_indices_obs 

859 

860 def _create_tree_node( 

861 self, 

862 observable: FourierTree.PauliOperator, 

863 pauli_rotation_idx: int, 

864 parameter_idx: Optional[int] = None, 

865 is_sine: bool = False, 

866 is_cosine: bool = False, 

867 ) -> Optional[CoefficientsTreeNode]: 

868 """ 

869 Builds the Fourier-Tree according to the algorithm by Nemkov et al. 

870 

871 Args: 

872 observable (FourierTree.PauliOperator): Current observable 

873 pauli_rotation_idx (int): Index of remaining Pauli rotation gates. 

874 Gates itself are attributes of the class. 

875 parameter_idx (Optional[int]): Index of the current parameter. 

876 Parameters itself are attributes of the class. 

877 is_sine (bool): If the current node is a sine (left) node. 

878 is_cosine (bool): If the current node is a cosine (right) node. 

879 

880 Returns: 

881 Optional[CoefficientsTreeNode]: The resulting node. Children are set 

882 recursively. The top level receives the tree root. 

883 """ 

884 if self._early_stopping_possible(pauli_rotation_idx, observable): 

885 return None 

886 

887 # remove commuting paulis 

888 while pauli_rotation_idx >= 0: 

889 last_pauli = self.pauli_rotations[pauli_rotation_idx] 

890 if not observable.is_commuting(last_pauli.list_repr): 

891 break 

892 pauli_rotation_idx -= 1 

893 else: # leaf 

894 return FourierTree.CoefficientsTreeNode( 

895 parameter_idx, observable, is_sine, is_cosine 

896 ) 

897 

898 last_pauli = self.pauli_rotations[pauli_rotation_idx] 

899 

900 left = self._create_tree_node( 

901 observable, 

902 pauli_rotation_idx - 1, 

903 pauli_rotation_idx, 

904 is_cosine=True, 

905 ) 

906 

907 next_observable = self._create_new_observable(last_pauli.list_repr, observable) 

908 right = self._create_tree_node( 

909 next_observable, 

910 pauli_rotation_idx - 1, 

911 pauli_rotation_idx, 

912 is_sine=True, 

913 ) 

914 

915 return FourierTree.CoefficientsTreeNode( 

916 parameter_idx, 

917 observable, 

918 is_sine, 

919 is_cosine, 

920 left, 

921 right, 

922 ) 

923 

924 def _create_new_observable( 

925 self, pauli: np.ndarray[int], observable: FourierTree.PauliOperator 

926 ) -> FourierTree.PauliOperator: 

927 """ 

928 Utility function to obtain the new observable for a tree node, if the 

929 last Pauli and the observable do not commute. 

930 

931 Args: 

932 pauli (np.ndarray[int]): The int array representation of the last 

933 Pauli rotation in the operation sequence. 

934 observable (FourierTree.PauliOperator): The current observable. 

935 

936 Returns: 

937 FourierTree.PauliOperator: The new observable. 

938 """ 

939 observable = observable.tensor(pauli) 

940 return observable 

941 

942 

943class FCC: 

944 @staticmethod 

945 def get_fcc( 

946 model: Model, 

947 n_samples: int, 

948 seed: int, 

949 method: Optional[str] = "pearson", 

950 scale: Optional[bool] = False, 

951 weight: Optional[bool] = False, 

952 trim_redundant: Optional[bool] = True, 

953 **kwargs, 

954 ) -> float: 

955 """ 

956 Shortcut method to get just the FCC. 

957 This includes 

958 1. What is done in `get_fourier_fingerprint`: 

959 1. Calculating the coefficients (using `n_samples` and `seed`) 

960 2. Correlating the result from 1) using `method` 

961 3. Weighting the correlation matrix (if `weight` is True) 

962 4. Remove redundancies 

963 2. What is done in `calculate_fcc`: 

964 1. Absolute of the fingerprint 

965 2. Average 

966 

967 Args: 

968 model (Model): The QFM model 

969 n_samples (int): Number of samples to calculate average of coefficients 

970 seed (int): Seed to initialize random parameters 

971 method (Optional[str], optional): Correlation method. Defaults to "pearson". 

972 scale (Optional[bool], optional): Whether to scale the number of samples. 

973 Defaults to False. 

974 weight (Optional[bool], optional): Whether to weight the correlation matrix. 

975 Defaults to False. 

976 trim_redundant (Optional[bool], optional): Whether to remove redundant 

977 correlations. Defaults to False. 

978 **kwargs: Additional keyword arguments for the model function. 

979 

980 Returns: 

981 float: The FCC 

982 """ 

983 fourier_fingerprint, _ = FCC.get_fourier_fingerprint( 

984 model, 

985 n_samples, 

986 seed, 

987 method, 

988 scale, 

989 weight, 

990 trim_redundant=trim_redundant, 

991 **kwargs, 

992 ) 

993 

994 return FCC.calculate_fcc(fourier_fingerprint) 

995 

996 def get_fourier_fingerprint( 

997 model: Model, 

998 n_samples: int, 

999 seed: int, 

1000 method: Optional[str] = "pearson", 

1001 scale: Optional[bool] = False, 

1002 weight: Optional[bool] = False, 

1003 trim_redundant: Optional[bool] = True, 

1004 **kwargs, 

1005 ) -> Tuple[np.ndarray, np.ndarray]: 

1006 """ 

1007 Shortcut method to get just the fourier fingerprint. 

1008 This includes 

1009 1. Calculating the coefficients (using `n_samples` and `seed`) 

1010 2. Correlating the result from 1) using `method` 

1011 3. Weighting the correlation matrix (if `weight` is True) 

1012 4. Remove redundancies (if `trim_redundant` is True) 

1013 

1014 Args: 

1015 model (Model): The QFM model 

1016 n_samples (int): Number of samples to calculate average of coefficients 

1017 seed (int): Seed to initialize random parameters 

1018 method (Optional[str], optional): Correlation method. Defaults to "pearson". 

1019 scale (Optional[bool], optional): Whether to scale the number of samples. 

1020 Defaults to False. 

1021 weight (Optional[bool], optional): Whether to weight the correlation matrix. 

1022 Defaults to False. 

1023 trim_redundant (Optional[bool], optional): Whether to remove redundant 

1024 correlations. Defaults to True. 

1025 **kwargs: Additional keyword arguments for the model function. 

1026 

1027 Returns: 

1028 Tuple[np.ndarray, np.ndarray]: The fourier fingerprint 

1029 and the frequency indices 

1030 """ 

1031 _, coeffs, freqs = FCC._calculate_coefficients( 

1032 model, n_samples, seed, scale, **kwargs 

1033 ) 

1034 fourier_fingerprint = FCC._correlate(coeffs.transpose(), method=method) 

1035 

1036 # perform weighting if requested 

1037 fourier_fingerprint = ( 

1038 FCC._weighting(fourier_fingerprint) if weight else fourier_fingerprint 

1039 ) 

1040 

1041 if trim_redundant: 

1042 mask = FCC._calculate_mask(freqs) 

1043 

1044 # apply the mask on the fingerprint 

1045 fourier_fingerprint = mask * fourier_fingerprint 

1046 

1047 row_mask = np.any(np.isfinite(fourier_fingerprint), axis=1) 

1048 col_mask = np.any(np.isfinite(fourier_fingerprint), axis=0) 

1049 

1050 fourier_fingerprint = fourier_fingerprint[row_mask][:, col_mask] 

1051 

1052 return fourier_fingerprint, freqs 

1053 

1054 @staticmethod 

1055 def calculate_fcc( 

1056 fourier_fingerprint: np.ndarray, 

1057 ) -> float: 

1058 """ 

1059 Method to calculate the FCC based on an existing correlation matrix. 

1060 Calculate absolute and then the average over this matrix. 

1061 The Fingerprint can be obtained via `get_fourier_fingerprint` 

1062 

1063 Args: 

1064 coeff_coeff_correlation (np.ndarray): Correlation matrix of coefficients 

1065 Returns: 

1066 float: The FCC 

1067 """ 

1068 # apply the mask on the fingerprint 

1069 return np.nanmean(np.abs(fourier_fingerprint)) 

1070 

1071 def _calculate_mask(freqs: np.ndarray) -> np.ndarray: 

1072 """ 

1073 Method to calculate a mask filtering out redundant elements 

1074 of the Fourier correlation matrix, based on the provided frequency vector. 

1075 It does so by 'simulating' the operations that would be performed 

1076 by `_correlate`. 

1077 

1078 Args: 

1079 freqs (np.ndarray): Array of frequencies 

1080 

1081 Returns: 

1082 np.ndarray: The mask 

1083 """ 

1084 # TODO: this part can be heavily optimized, by e.g. using a "positive_only" 

1085 # flag when calculating the coefficients. 

1086 # However this would change the numerical values 

1087 # (while the order should be still the same). 

1088 

1089 # disregard all the negativ frequencies 

1090 freqs[freqs < 0] = np.nan 

1091 # compute the outer product of the frequency vectors for arbitrary dimensions 

1092 # or just use the existing frequency vector if it is 1D 

1093 nd_freqs = ( 

1094 reduce(np.multiply, np.ix_(*freqs)) if len(freqs.shape) > 1 else freqs 

1095 ) 

1096 # TODO: could prevent this if we're not using .squeeze().. 

1097 

1098 # "simulate" what would happen on correlating the coefficients 

1099 corr_freqs = np.outer(nd_freqs, nd_freqs) 

1100 # mask all frequencies that are nan now 

1101 # (i.e. all correlations with a negative frequency component) 

1102 corr_mask = np.where(np.isnan(corr_freqs), corr_freqs, 1) 

1103 # from this, disregard all the other redundant correlations (i.e. c_0_1 = c_1_0) 

1104 corr_mask[np.triu_indices(corr_mask.shape[0], 0)] = np.nan 

1105 

1106 return corr_mask 

1107 

1108 @staticmethod 

1109 def _calculate_coefficients( 

1110 model: Model, 

1111 n_samples: int, 

1112 seed: int, 

1113 scale: bool = False, 

1114 **kwargs, 

1115 ) -> Tuple[np.ndarray, np.ndarray]: 

1116 """ 

1117 Calculates the Fourier coefficients of a given model 

1118 using `n_samples` and `seed`. 

1119 Optionally, `noise_params` can be passed to perform noisy simulation. 

1120 

1121 Args: 

1122 model (Model): The QFM model 

1123 n_samples (int): Number of samples to calculate average of coefficients 

1124 seed (int): Seed to initialize random parameters 

1125 scale (bool, optional): Whether to scale the number of samples. 

1126 Defaults to False. 

1127 **kwargs: Additional keyword arguments for the model function. 

1128 

1129 Returns: 

1130 Tuple[np.ndarray, np.ndarray]: Parameters and Coefficients of size NxK 

1131 """ 

1132 if n_samples > 0: 

1133 if scale: 

1134 total_samples = int( 

1135 np.power(2, model.n_qubits) * n_samples * model.n_input_feat 

1136 ) 

1137 log.info(f"Using {total_samples} samples.") 

1138 else: 

1139 total_samples = n_samples 

1140 rng = np.random.default_rng(seed) 

1141 model.initialize_params(rng=rng, repeat=total_samples) 

1142 else: 

1143 total_samples = 1 

1144 

1145 coeffs, freqs = Coefficients.get_spectrum( 

1146 model, shift=True, trim=True, **kwargs 

1147 ) 

1148 

1149 return model.params, coeffs, freqs 

1150 

1151 @staticmethod 

1152 def _correlate(mat: np.ndarray, method: str = "pearson") -> np.ndarray: 

1153 """ 

1154 Correlates two arrays using `method`. 

1155 Currently, `pearson` and `spearman` are supported. 

1156 

1157 Args: 

1158 mat (np.ndarray): Array of shape (N, K) 

1159 method (str, optional): Correlation method. Defaults to "pearson". 

1160 

1161 Raises: 

1162 ValueError: If the method is not supported. 

1163 

1164 Returns: 

1165 np.ndarray: Correlation matrix of `a` and `b`. 

1166 """ 

1167 assert len(mat.shape) >= 2, "Input matrix must have at least 2 dimensions" 

1168 

1169 # Note that for the general n-D case, we have to flatten along 

1170 # the first axis (last one is batch). 

1171 # Note that the order here is important so we can easily filter out 

1172 # negative coefficients later. 

1173 # Consider the following example: [[1,2,3],[4,5,6],[7,8,9]] 

1174 # we want to get [1, 4, 7, 2, 5, 8, 3, 6, 9] 

1175 # such that after correlation, all positive indexed coefficients 

1176 # will be in the bottom right quadrant 

1177 if method == "pearson": 

1178 result = FCC._pearson(mat.reshape(mat.shape[0], -1)) 

1179 # result = FCC._pearson(mat.reshape(mat.shape[-1], -1, order="F")) 

1180 elif method == "spearman": 

1181 result = FCC._spearman(mat.reshape(mat.shape[0], -1)) 

1182 # result = FCC._spearman(mat.reshape(mat.shape[-1], -1, order="F")) 

1183 else: 

1184 raise ValueError( 

1185 f"Unknown correlation method: {method}. \ 

1186 Must be 'pearson' or 'spearman'." 

1187 ) 

1188 

1189 return result 

1190 

1191 @staticmethod 

1192 def _pearson( 

1193 mat: np.ndarray, cov: Optional[bool] = False, minp: Optional[int] = 1 

1194 ) -> np.ndarray: 

1195 """ 

1196 Based on Pandas correlation method as implemented here: 

1197 https://github.com/pandas-dev/pandas/blob/main/pandas/_libs/algos.pyx 

1198 

1199 Compute Pearson correlation between columns of `mat`, 

1200 permitting missing values (NaN or ±Inf). 

1201 

1202 Args: 

1203 mat : array_like, shape (N, K) 

1204 Input data. 

1205 minp : int, optional 

1206 Minimum number of paired observations required to form a correlation. 

1207 If the number of valid pairs for (i, j) is < minp, the result is NaN. 

1208 

1209 Returns: 

1210 corr : ndarray, shape (K, K) 

1211 Pearson correlation matrix. 

1212 """ 

1213 

1214 mat = np.asarray(mat, dtype=np.float64) 

1215 N, K = mat.shape 

1216 

1217 # pre‐compute finite‐mask 

1218 mask = np.isfinite(mat) 

1219 

1220 # output 

1221 result = np.empty((K, K), dtype=np.float64) 

1222 

1223 # TODO: optimize in future iterations 

1224 # loop over column‐pairs 

1225 for i in range(K): 

1226 for j in range(i + 1): 

1227 # find rows where both columns are finite 

1228 m = mask[:, i] & mask[:, j] 

1229 n = np.count_nonzero(m) 

1230 if n < minp: 

1231 # too few pairs 

1232 value = np.nan 

1233 else: 

1234 x = mat[m, i] 

1235 y = mat[m, j] 

1236 

1237 # compute means 

1238 mean_x = x.mean() 

1239 mean_y = y.mean() 

1240 

1241 # demeaned data 

1242 dx = x - mean_x 

1243 dy = y - mean_y 

1244 

1245 # sum of squares and cross‐prod 

1246 ssx = np.dot(dx, dx) 

1247 ssy = np.dot(dy, dy) 

1248 cxy = np.dot(dx, dy) 

1249 

1250 if cov: 

1251 # sample covariance (denominator n−1) 

1252 value = cxy / (n - 1) if n > 1 else np.nan 

1253 else: 

1254 # Pearson r = cov / (σx σy) 

1255 denom = np.sqrt(ssx * ssy) 

1256 if denom == 0.0: 

1257 value = np.nan 

1258 else: 

1259 value = cxy / denom 

1260 # clip numerical drift 

1261 if value > 1.0: 

1262 value = 1.0 

1263 elif value < -1.0: 

1264 value = -1.0 

1265 

1266 result[i, j] = result[j, i] = value 

1267 

1268 return result 

1269 

1270 def _spearman(mat: np.ndarray, minp: Optional[int] = 1) -> np.ndarray: 

1271 """ 

1272 Based on Pandas correlation method as implemented here: 

1273 https://github.com/pandas-dev/pandas/blob/main/pandas/_libs/algos.pyx 

1274 

1275 Compute Spearman correlation between columns of `mat`, 

1276 permitting missing values (NaN or ±Inf). 

1277 

1278 Args: 

1279 mat : array_like, shape (N, K) 

1280 Input data. 

1281 minp : int, optional 

1282 Minimum number of paired observations required to form a correlation. 

1283 If the number of valid pairs for (i, j) is < minp, the result is NaN. 

1284 

1285 Returns: 

1286 corr : ndarray, shape (K, K) 

1287 Spearman correlation matrix. 

1288 """ 

1289 N, K = mat.shape 

1290 # trivial all-NaN answer if too few rows 

1291 if N < minp: 

1292 return np.full((K, K), np.nan, dtype=float) 

1293 

1294 # mask of finite entries 

1295 mask = np.isfinite(mat) # shape (N, K), dtype=bool 

1296 

1297 # precompute ranks column-wise ignoring NaNs 

1298 ranks = np.full((N, K), np.nan, dtype=float) 

1299 for j in range(K): 

1300 valid = mask[:, j] 

1301 if valid.any(): 

1302 # rankdata by default gives average ranks for ties 

1303 ranks[valid, j] = rankdata(mat[valid, j], method="average") 

1304 

1305 # allocate result 

1306 result = np.empty((K, K), dtype=float) 

1307 

1308 # TODO: optimize in future iterations 

1309 # loop lower triangle (including diagonal) 

1310 for i in range(K): 

1311 for j in range(i + 1): 

1312 # find rows where both columns are finite 

1313 valid = mask[:, i] & mask[:, j] 

1314 nobs = valid.sum() 

1315 

1316 if nobs < minp: 

1317 rho = np.nan 

1318 else: 

1319 xi = ranks[valid, i] 

1320 yj = ranks[valid, j] 

1321 # subtract means 

1322 xi = xi - xi.mean() 

1323 yj = yj - yj.mean() 

1324 num = np.dot(xi, yj) 

1325 den = np.sqrt(np.dot(xi, xi) * np.dot(yj, yj)) 

1326 rho = num / den if den > 0 else np.nan 

1327 

1328 result[i, j] = rho 

1329 result[j, i] = rho 

1330 

1331 return result 

1332 

1333 @staticmethod 

1334 def _weighting(fourier_fingerprint: np.ndarray) -> np.ndarray: 

1335 """ 

1336 Performs weighting on the given correlation matrix. 

1337 Here, low-frequent coefficients are weighted more heavily. 

1338 

1339 Args: 

1340 correlation (np.ndarray): Correlation matrix 

1341 """ 

1342 # TODO: in Future iterations, this can be optimized by computing 

1343 # on the trimmed matrix instead. 

1344 

1345 assert ( 

1346 fourier_fingerprint.shape[0] % 2 != 0 

1347 and fourier_fingerprint.shape[1] % 2 != 0 

1348 ), "Correlation matrix must have odd dimensions. \ 

1349 Hint: use `trim` argument when calling `get_spectrum`." 

1350 assert ( 

1351 fourier_fingerprint.shape[0] == fourier_fingerprint.shape[1] 

1352 ), "Correlation matrix must be square." 

1353 

1354 def quadrant_to_matrix(a: np.ndarray) -> np.ndarray: 

1355 """ 

1356 Transforms [[1,2],[3,4]] to 

1357 [[1,2,1],[3,4,3],[1,2,1]] 

1358 

1359 Args: 

1360 a (np.ndarray): _description_ 

1361 

1362 Returns: 

1363 np.ndarray: _description_ 

1364 """ 

1365 # rotates a from [[1,2],[3,4]] to [[3,4],[1,2]] 

1366 a_rot = np.rot90(a) 

1367 # merge the two matrices 

1368 left = np.concat([a, a_rot]) 

1369 # merges left and right (left flipped) 

1370 b = np.concat( 

1371 [left, np.flip(left)], 

1372 axis=1, 

1373 ) 

1374 # remove the middle column and row 

1375 return np.delete( 

1376 np.delete(b, (b.shape[0] // 2), axis=0), (b.shape[1] // 2), axis=1 

1377 ) 

1378 

1379 nc = fourier_fingerprint.shape[0] // 2 + 1 

1380 weights = nnp.mgrid[0:nc:1, 0:nc:1].sum(axis=0) / ((nc - 1) * 2) 

1381 weights_matrix = quadrant_to_matrix(weights) 

1382 

1383 return fourier_fingerprint * weights_matrix 

1384 raise NotImplementedError("Weighting method is not implemented")