diff --git a/mpqp/core/circuit.py b/mpqp/core/circuit.py index 6261f433..23276eb2 100644 --- a/mpqp/core/circuit.py +++ b/mpqp/core/circuit.py @@ -34,7 +34,7 @@ from __future__ import annotations from copy import deepcopy -from numbers import Complex +from numbers import Complex, Number from typing import TYPE_CHECKING, Literal, Optional, Sequence, Type, Union, overload from warnings import warn @@ -70,7 +70,7 @@ from braket.circuits import Circuit as braket_Circuit from cirq.circuits.circuit import Circuit as cirq_Circuit from qat.core.wrappers.circuit import Circuit as myQLM_Circuit - from qiskit.circuit import QuantumCircuit + from qiskit.circuit import Parameter, QuantumCircuit from qiskit_aer import AerSimulator from sympy import Basic, Expr @@ -171,7 +171,10 @@ def __init__( self._user_nb_qubits: Optional[int] = None self._nb_qubits: int - self.transpiled_circuit: "Optional[Union[braket_Circuit, cirq_Circuit, myQLM_Circuit, QuantumCircuit]]" = (None) + self.transpiled_circuit: dict[ + AvailableDevice, + Union[braket_Circuit, cirq_Circuit, myQLM_Circuit, QuantumCircuit], + ] = {} """A pre-transpiled circuit to skip repeated transpilation when running the circuit. Useful when working with a symbolic circuit that needs to be executed with different parameters.""" @@ -2152,6 +2155,15 @@ def __repr__(self) -> str: return f'QCircuit({args_repr})' + def transpiled_for_device(self, device: AvailableDevice): + if device not in self.transpiled_circuit: + self.transpiled_circuit[device] = ( + self.to_other_device( # pyright: ignore[reportCallIssue] + device # pyright: ignore[reportArgumentType] + ) + ) + return self.transpiled_circuit[device] + def variables(self) -> set[Basic]: """Returns all the symbolic parameters involved in this circuit. @@ -2176,3 +2188,41 @@ def variables(self) -> set[Basic]: if isinstance(param, Expr): params.update(param.free_symbols) return params + + def bind_parameters( + self, device: AvailableDevice, values: dict[str | Parameter | Basic, Number] + ) -> QCircuit: + """Bind parameter values to the transpiled circuit.""" + # TODO: to enhance docs + if device not in self.transpiled_circuit: + self.transpiled_for_device(device) + transpiled = self.transpiled_circuit[device] + + param_values = {str(k): v for k, v in values.items()} + + bound_circuit = deepcopy(self) + bound_circuit.transpiled_circuit = dict(self.transpiled_circuit) + + from qiskit import QuantumCircuit + + if isinstance(transpiled, QuantumCircuit): + qiskit_param_map = { + p: param_values[p.name] + for p in transpiled.parameters + if p.name in param_values + } + bound_circuit.transpiled_circuit[device] = ( + transpiled.assign_parameters(qiskit_param_map) + if qiskit_param_map + else transpiled + ) + return bound_circuit + + elif isinstance(transpiled, braket_Circuit): + bound_circuit.transpiled_circuit[device] = transpiled.make_bound_circuit( + param_values + ) + return bound_circuit + + else: + raise TypeError(f"Unsupported transpiled circuit yet: {type(transpiled)}") diff --git a/mpqp/core/instruction/measurement/expectation_value.py b/mpqp/core/instruction/measurement/expectation_value.py index 63fea6d4..5e926d53 100644 --- a/mpqp/core/instruction/measurement/expectation_value.py +++ b/mpqp/core/instruction/measurement/expectation_value.py @@ -30,6 +30,7 @@ class to define your observable, and a :class:`ExpectationMeasure` to perform if TYPE_CHECKING: from braket.circuits.observables import Hermitian, Sum + from braket.circuits import Circuit as braket_Circuit from cirq.circuits.circuit import Circuit as CirqCircuit from cirq.ops.linear_combinations import PauliSum as CirqPauliSum from cirq.ops.pauli_string import PauliString as CirqPauliString @@ -39,6 +40,7 @@ class to define your observable, and a :class:`ExpectationMeasure` to perform from sympy import Expr from mpqp.core.instruction.gates.custom_controlled_gate import Gate + from mpqp.execution.devices import AvailableDevice class Observable: @@ -81,8 +83,15 @@ def __init__( self._is_diagonal = None self._diag_elements: Optional[npt.NDArray[np.float64]] = None self.label = label - "See parameter description." - self.pre_transpiled = None + """See parameter description.""" + self.pre_transpiled: dict[ + AvailableDevice, + Union[ + SparsePauliOp, QLMObservable, CirqPauliSum, CirqPauliString, Hermitian + ], + ] = {} # TODO: do we put None, or empty dict ? + # TODO: docstring + """TODO: documentation""" if isinstance(observable, PauliString): self.nb_qubits = observable.nb_qubits @@ -437,7 +446,17 @@ def __init__( """See parameter description.""" self.optimize_measurement = optimize_measurement """See parameter description.""" - self.pre_transpiled = None + # TODO: do we need both pre_transpiled and translated_pre_measures ? + # self.pre_transpiled = None + # TODO : docstring + """TODO""" + self.translated_pre_measures: dict[ + AvailableDevice, + tuple[list[dict[str, npt.NDArray[np.float64]]], list[braket_Circuit]], + ] = {} + # TODO : docstring + """TODO""" + if isinstance(observable, Observable): observable = [observable] else: @@ -622,3 +641,44 @@ def to_dict(self): and not attr_name.startswith("__") and not callable(getattr(self, attr_name)) } + + # TODO: double check this method + def pre_transpile_observables(self, device: AvailableDevice): + from mpqp.execution.devices import AWSDevice, IBMDevice + + if device in self.translated_pre_measures: + return + + if isinstance(device, AWSDevice): + from mpqp.core.circuit import QCircuit + from mpqp.tools.pauli_grouping import ( + find_qubitwise_rotations, + pauli_monomial_eigenvalues, + ) + + grouping = self.get_pauli_grouping() + transpiled_pre_measures = [ + QCircuit(find_qubitwise_rotations(group)).to_other_language( + Language.BRAKET + ) + for group in grouping + ] + eigenvalues = [ + {monom.name: pauli_monomial_eigenvalues(monom) for monom in group} + for group in grouping + ] + self.translated_pre_measures[device] = ( + eigenvalues, + transpiled_pre_measures, + ) + + elif isinstance(device, IBMDevice): + for observable in self.observables: + if device not in observable.pre_transpile: + observable.pre_transpile[device] = observable.to_other_language( + language=Language.QISKIT + ) + else: + raise NotImplementedError( + f"Pre-transpilation for device {type(device).__name__} is not implemented." + ) diff --git a/mpqp/execution/__init__.py b/mpqp/execution/__init__.py index 561e0cae..25771d7a 100644 --- a/mpqp/execution/__init__.py +++ b/mpqp/execution/__init__.py @@ -3,14 +3,14 @@ ATOSDevice, AvailableDevice, AWSDevice, + AZUREDevice, GOOGLEDevice, IBMDevice, - AZUREDevice, ) -from .simulated_devices import IBMSimulatedDevice -from .job import Job, JobStatus, JobType +from .job import ExecutionMode, Job, JobStatus, JobType from .result import BatchResult, Result, Sample, StateVector from .runner import adjust_measure, run, submit +from .simulated_devices import IBMSimulatedDevice # This import has to be done after the loading of result to work, `pass` is a # trick to avoid isort to move this line above diff --git a/mpqp/execution/connection/ibm_connection.py b/mpqp/execution/connection/ibm_connection.py index aa1718dd..9bd31810 100644 --- a/mpqp/execution/connection/ibm_connection.py +++ b/mpqp/execution/connection/ibm_connection.py @@ -1,5 +1,5 @@ from getpass import getpass -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from termcolor import colored @@ -9,10 +9,11 @@ if TYPE_CHECKING: from qiskit.providers.backend import BackendV2 - from qiskit_ibm_runtime import QiskitRuntimeService + from qiskit_ibm_runtime import QiskitRuntimeService, Session Runtime_Service = None +_ibm_sessions: dict[str, "Session"] = {} def config_ibm_account(token: str): @@ -208,3 +209,49 @@ def get_all_job_ids() -> list[str]: if get_env_variable("IBM_CONFIGURED") == "True": return [job.job_id() for job in get_QiskitRuntimeService().jobs(limit=None)] return [] + + +def get_or_create_ibm_session( + backend: "BackendV2", max_time: Optional[int] = None +) -> "Session": + """Get an active IBM Runtime session for the given backend. + If a session exists and is valid, reuse it. Otherwise, create a new one. + """ + from qiskit_ibm_runtime import Session + from qiskit_ibm_runtime.exceptions import IBMNotAuthorizedError, IBMRuntimeError + + backend_name = backend.name + if backend_name in _ibm_sessions: + session = _ibm_sessions[backend_name] + try: + status = session.status() + except (IBMNotAuthorizedError, IBMRuntimeError): + status = "Closed" + + if status not in ("Closed", None): + return session + + new_session = Session(backend=backend, max_time=max_time) + _ibm_sessions[backend_name] = new_session + return new_session + + +def close_ibm_session(backend: "BackendV2") -> None: + """Close the currently active session, if one exists.""" + + from qiskit_ibm_runtime.exceptions import IBMNotAuthorizedError, IBMRuntimeError + + backend_name = backend.name + if backend_name not in _ibm_sessions: + return + + session = _ibm_sessions[backend_name] + try: + session.close() + except (IBMNotAuthorizedError, IBMRuntimeError) as err: + raise IBMRemoteExecutionError( + f"Failed to close IBM session for backend '{backend_name}'. " + "Session was not removed; you can retry.\nTrace: " + str(err) + ) + else: + del _ibm_sessions[backend_name] diff --git a/mpqp/execution/job.py b/mpqp/execution/job.py index 08d23d48..0e957fb3 100644 --- a/mpqp/execution/job.py +++ b/mpqp/execution/job.py @@ -13,9 +13,12 @@ from __future__ import annotations +from numbers import Number from typing import TYPE_CHECKING, Optional from aenum import Enum, NoAlias, auto +from qiskit.circuit import Parameter +from sympy import Basic from mpqp.tools.generics import MessageEnum @@ -70,6 +73,19 @@ class JobType(Enum): retrieve the expectation value in an optimal manner.""" +class ExecutionMode(Enum): + """Execution mode for remote backends. + It controls how jobs are submitted (single job, batch and session). + """ + + JOB = "JOB" + BATCH = "BATCH" + SESSION = "SESSION" + + def __str__(self): + return self.value + + class Job: """Representation of a job, an object regrouping all the information about the submission of a computation/measure of a quantum circuit on a @@ -87,6 +103,7 @@ class Job: device: Device (simulator, quantum computer) on which we want to execute the job. measure: Object representing the measure to perform. + mode: Remote execution mode (JOB(single), BATCH(batch/params) and SESSION(IBM Runtime Session)). Examples: >>> circuit = QCircuit(3) @@ -110,6 +127,7 @@ def __init__( job_type: JobType, circuit: QCircuit, device: AvailableDevice, + mode: ExecutionMode = ExecutionMode.JOB, ): self._status = JobStatus.INIT @@ -119,12 +137,21 @@ def __init__( """See parameter description.""" self.device = device """See parameter description.""" + self.mode = mode + """See parameter description.""" self.id: Optional[str] = None """Contains the id of the remote job, used to retrieve the result from the remote provider. ``None`` if the job is local. It can take a little while before it is set to the right value (For instance, a job submission can require handshake protocols to conclude before attributing an id to the job).""" + self.values: Optional[dict[str | Parameter | Basic, Number]] = None + """Parameter bindings for circuits containing symbolic variables. + + For local execution, parameters are typically substituted directly into the + circuit prior to execution. For remote execution, these bindings are stored in the + ``Job`` so the provider can bind them on the transpiled circuit without + re-transpiling the circuit each time.""" @property def measure(self) -> Optional[Measure]: @@ -168,7 +195,8 @@ def status(self, job_status: JobStatus): self._status = job_status def __repr__(self) -> str: - return f"{type(self).__name__}({self.job_type}, {repr(self.circuit)}, {self.device})" + # TODO: improve repr + return f"{type(self).__name__}({self.job_type}, {repr(self.circuit)}, {self.device}, mode={self.mode})" def __eq__(self, other): # pyright: ignore[reportMissingParameterType] if not isinstance(other, Job): @@ -185,6 +213,7 @@ def to_dict(self): "job_type": self.job_type, "circuit": self.circuit, "device": self.device, + "mode": self.mode, "measure": self.measure, "id": self.id, "status": self.status, @@ -218,7 +247,7 @@ def load_by_local_id(job_id: int): Uses :func:`~mpqp.local_storage.load.get_jobs_with_id`. Args: - job_id: Local id of the job you need. + job_id: Local id of the job you need. Example: >>> Job.load_by_local_id(1) # doctest: +ELLIPSIS diff --git a/mpqp/execution/providers/atos.py b/mpqp/execution/providers/atos.py index 97e816a1..69f3b554 100644 --- a/mpqp/execution/providers/atos.py +++ b/mpqp/execution/providers/atos.py @@ -82,12 +82,10 @@ def job_pre_processing(job: Job) -> "Circuit": "`OBSERVABLE` jobs with shots!=0 are disabled for MPO." ) - if job.circuit.transpiled_circuit is None: - if TYPE_CHECKING: - assert isinstance(job.device, ATOSDevice) - myqlm_circuit = job.circuit.to_other_device(job.device) - else: - myqlm_circuit = job.circuit.transpiled_circuit + if TYPE_CHECKING: + assert isinstance(job.device, ATOSDevice) + + myqlm_circuit = job.circuit.transpiled_for_device(job.device) return myqlm_circuit @@ -250,12 +248,14 @@ def generate_observable_job(myqlm_circuit: "Circuit", job: Job) -> list["JobQLM" # TODO: [multi-obs] update this to take into account the case when we have list of Observables if TYPE_CHECKING: assert job.measure is not None and isinstance(job.measure, ExpectationMeasure) + result = [] for obs in job.measure.observables: - if obs.pre_transpiled is None: - qlm_obs = obs.to_other_language(Language.MY_QLM) - else: - qlm_obs = obs.pre_transpiled + if job.device not in obs.pre_transpiled: + obs.pre_transpiled[job.device] = obs.to_other_language(Language.MY_QLM) + + qlm_obs = obs.pre_transpiled[job.device] + result.append( myqlm_circuit.to_job( job_type="OBS", diff --git a/mpqp/execution/providers/aws.py b/mpqp/execution/providers/aws.py index b74e35f0..4f37620b 100644 --- a/mpqp/execution/providers/aws.py +++ b/mpqp/execution/providers/aws.py @@ -85,7 +85,9 @@ def apply_noise_to_braket_circuit( return noisy_circuit -def run_braket(job: Job) -> Result: +def run_braket(job: Job, reservation_arn: Optional[str] = None) -> Result: + # TODO: check if we keep just reservation_arn, or if we provide change it to `provider_specific_options` dict, + # to be more generic """Executes the job on the right AWS Braket device (local or remote) precised in the job in parameter and waits until the task is completed, then returns the Result. @@ -112,8 +114,8 @@ def run_braket(job: Job) -> Result: from braket.tasks import GateModelQuantumTaskResult if isinstance(job.measure, ExpectationMeasure): - return run_braket_observable(job) - _, task = submit_job_braket(job) + return run_braket_observable(job, reservation_arn) + _, task = submit_job_braket(job, reservation_arn) res = task.result() if TYPE_CHECKING: assert isinstance(res, GateModelQuantumTaskResult) @@ -121,7 +123,9 @@ def run_braket(job: Job) -> Result: return extract_result(res, job, job.device) -def run_braket_observable(job: Job): +def run_braket_observable(job: Job, reservation_arn: Optional[str] = None): + # TODO: check if we keep just reservation_arn, or if we provide change it to `provider_specific_options` dict, + # to be more generic """Returns the result of an ``OBSERVABLE`` job. TODO: check that the link bellow is correctly generated. @@ -141,11 +145,9 @@ def run_braket_observable(job: Job): from braket.tasks import GateModelQuantumTaskResult assert isinstance(job.device, AWSDevice) - if job.circuit.transpiled_circuit is None: - transpiled_circuit = job.circuit.to_other_device(job.device) - else: - transpiled_circuit = job.circuit.transpiled_circuit - assert isinstance(transpiled_circuit, Circuit) + + transpiled_circuit = job.circuit.transpiled_for_device(job.device) + assert isinstance(transpiled_circuit, Circuit) device = get_braket_device( job.device, @@ -156,32 +158,21 @@ def run_braket_observable(job: Job): assert isinstance(job.measure, ExpectationMeasure) results, errors = {}, {} + if job.measure.optimize_measurement: - from mpqp.tools.pauli_grouping import ( - find_qubitwise_rotations, - pauli_monomial_eigenvalues, + # TODO: review this if it can be rewritten or regrouped. What if the observable were already transpiled? + # we still call it? + job.measure.pre_transpile_observables(job.device) + eigenvalues_by_group, transpiled_pre_measures = ( + job.measure.translated_pre_measures[job.device] ) + # TODO: check that the transpilation of observables/premeasure is done/required correctly - if job.measure.pre_transpiled is None: - grouping = job.measure.get_pauli_grouping() - transpiled_pre_measures = [ - QCircuit(find_qubitwise_rotations(group)).to_other_language( - Language.BRAKET - ) - for group in grouping - ] - eigenvalues = [ - {monom.name: pauli_monomial_eigenvalues(monom) for monom in group} - for group in grouping - ] - - else: - eigenvalues, transpiled_pre_measures = ( - job.measure.pre_transpiled - ) # pyright: ignore[reportGeneralTypeIssues] + expectation_values: dict[str, float] = {} - expectation_values = {} - for eigenvalues, pre_measure in zip(eigenvalues, transpiled_pre_measures): + for group_eigenvalues, pre_measure in zip( + eigenvalues_by_group, transpiled_pre_measures + ): job.status = JobStatus.RUNNING if job.measure.shots == 0: from copy import deepcopy @@ -203,6 +194,7 @@ def run_braket_observable(job: Job): ) result = local_result.result() assert isinstance(result, GateModelQuantumTaskResult) + length = 2**job.circuit.nb_qubits sorted_values: list[float] = [] for i in range(length): @@ -213,7 +205,7 @@ def run_braket_observable(job: Job): ) else: sorted_values.append(0) - for name, eigenvalue in eigenvalues.items(): + for name, eigenvalue in group_eigenvalues.items(): expectation_value: float = np.dot( eigenvalue, np.array(sorted_values, dtype=np.float64), @@ -228,6 +220,7 @@ def run_braket_observable(job: Job): local += expectation_values[monoms.name] * monoms.coef results.update({f"observable_{i}": local}) errors.update({f"observable_{len(errors)}": None}) + if len(results) == 1: return Result(job, results["observable_0"], shots=job.measure.shots) return Result(job, results, errors, shots=job.measure.shots) @@ -300,10 +293,14 @@ def run_braket_observable(job: Job): if len(results) == 1: return Result(job, results["observable_0"], None, job.measure.shots) + return Result(job, results, errors, job.measure.shots) -def submit_job_braket(job: Job) -> tuple[str, "QuantumTask"]: +def submit_job_braket( + job: Job, reservation_arn: Optional[str] = None +) -> tuple[str, "QuantumTask"]: + # TODO: change reservation_arn to more generic parameter for provider_options """Submits the job to the right local/remote device and returns the generated task. @@ -351,14 +348,10 @@ def submit_job_braket(job: Job) -> tuple[str, "QuantumTask"]: from braket.circuits import Circuit device = get_braket_device(job.device, is_noisy=is_noisy) - - if job.circuit.transpiled_circuit is None: - braket_circuit = job.circuit.to_other_device(job.device) - else: - braket_circuit = job.circuit.transpiled_circuit - + braket_circuit = job.circuit.transpiled_for_device(job.device) if TYPE_CHECKING: assert isinstance(braket_circuit, Circuit) + if job.job_type == JobType.STATE_VECTOR: # rebind safe_retrieve_samples from braket to Normalize the probability # because the bracket does not do so and this causes a crash. @@ -390,9 +383,12 @@ def safe_retrieve_samples(self): # pyright: ignore[reportMissingParameterType] task = device.run(braket_circuit, shots=job.measure.shots, inputs=None) elif job.job_type == JobType.OBSERVABLE: - # TODO : [multi-obs] update this to take into account the case when we have list of Observables if TYPE_CHECKING: assert isinstance(job.measure, ExpectationMeasure) + + # TODO : [multi-obs] update this to take into account the case when we have list of Observables + # and in this case looping over the list of observables is not sufficient, we have to take into account the + # grouping and do it manually if job.measure.optimize_measurement is true otherwise do it sequentially if job.measure.observables[0].pre_transpiled is None: herm_op = job.measure.observables[0].to_other_language(Language.BRAKET) else: @@ -608,3 +604,15 @@ def estimate_cost_single_job( else: return 0 + + +# TODO: finish this implementation, why did we need context manager ? +# @contextmanager +def optional_reservation_arn(device: AWSDevice, reservation_arn: Optional[str] = None): + from braket.aws import DirectReservation + + if reservation_arn: + with DirectReservation(device.get_arn(), reservation_arn=reservation_arn): + yield + else: + yield diff --git a/mpqp/execution/providers/azure.py b/mpqp/execution/providers/azure.py index f3cfa848..60d5361e 100644 --- a/mpqp/execution/providers/azure.py +++ b/mpqp/execution/providers/azure.py @@ -13,7 +13,7 @@ get_jobs_by_id, ) from mpqp.execution.devices import AZUREDevice -from mpqp.execution.job import IBMDevice, Job, JobStatus, JobType +from mpqp.execution.job import Job, JobStatus, JobType from mpqp.execution.result import Result, Sample @@ -54,10 +54,7 @@ def submit_job_azure(job: Job) -> tuple[str, "AzureQuantumJob"]: """ from qiskit import QuantumCircuit - if job.circuit.transpiled_circuit is None: - qiskit_circuit = job.circuit.to_other_device(IBMDevice.AER_SIMULATOR) - else: - qiskit_circuit = job.circuit.transpiled_circuit + qiskit_circuit = job.circuit.transpiled_for_device(job.device) if TYPE_CHECKING: assert isinstance(qiskit_circuit, QuantumCircuit) diff --git a/mpqp/execution/providers/google.py b/mpqp/execution/providers/google.py index adae0652..79744b47 100644 --- a/mpqp/execution/providers/google.py +++ b/mpqp/execution/providers/google.py @@ -248,13 +248,14 @@ def run_cirq_observable( errors = {} expectation_values = {} for obs in job.measure.observables: - - if obs.pre_transpiled is None: + if job.device not in obs.pre_transpiled: cirq_obs = obs.to_other_language( language=Language.CIRQ, circuit=circuit ) + obs.pre_transpile[job.device] = cirq_obs else: - cirq_obs = obs.pre_transpiled + cirq_obs = obs.pre_transpiled[job.device] + if TYPE_CHECKING: assert type(cirq_obs) in (CirqPauliSum, CirqPauliString) job.status = JobStatus.RUNNING @@ -409,11 +410,7 @@ def run_google_remote(job: Job) -> Result: import cirq_ionq as ionq from cirq.circuits.circuit import Circuit as CirqCircuit - if job.circuit.transpiled_circuit is None: - job_CirqCircuit = job.circuit.to_other_device(job.device) - else: - job_CirqCircuit = job.circuit.transpiled_circuit - + job_CirqCircuit = job.circuit.transpiled_for_device(job.device) if TYPE_CHECKING: assert isinstance(job_CirqCircuit, CirqCircuit) @@ -467,11 +464,7 @@ def run_local(job: Job) -> Result: if job.device.is_processor(): return run_local_processor(job) - if job.circuit.transpiled_circuit is None: - cirq_circuit = job.circuit.to_other_device(job.device) - else: - cirq_circuit = job.circuit.transpiled_circuit - + cirq_circuit = job.circuit.transpiled_for_device(job.device) if TYPE_CHECKING: assert isinstance(cirq_circuit, CirqCircuit) @@ -531,11 +524,7 @@ def run_local_processor(job: Job) -> Result: ) simulator = SimulatedLocalEngine([sim_processor]) - if job.circuit.transpiled_circuit is None: - cirq_circuit = job.circuit.to_other_device(job.device) - else: - cirq_circuit = job.circuit.transpiled_circuit - + cirq_circuit = job.circuit.transpiled_for_device(job.device) if TYPE_CHECKING: assert isinstance(cirq_circuit, CirqCircuit) diff --git a/mpqp/execution/providers/ibm.py b/mpqp/execution/providers/ibm.py index 1d2afd56..d2e98218 100644 --- a/mpqp/execution/providers/ibm.py +++ b/mpqp/execution/providers/ibm.py @@ -3,7 +3,7 @@ import math import warnings from copy import deepcopy -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Union import numpy as np @@ -18,8 +18,8 @@ get_QiskitRuntimeService, ) from mpqp.execution.devices import AZUREDevice, IBMDevice -from mpqp.execution.job import Job, JobStatus, JobType -from mpqp.execution.result import Result, Sample, StateVector +from mpqp.execution.job import ExecutionMode, Job, JobStatus, JobType +from mpqp.execution.result import BatchResult, Result, Sample, StateVector from mpqp.noise import DimensionalNoiseModel from mpqp.tools.errors import ( DeviceJobIncompatibleError, @@ -36,16 +36,18 @@ PubResult, SamplerPubResult, ) + from qiskit.providers.backend import BackendV2 from qiskit.quantum_info import SparsePauliOp from qiskit.result import Result as QiskitResult from qiskit_aer import AerSimulator from qiskit_aer.noise import NoiseModel as Qiskit_NoiseModel - from qiskit_ibm_runtime import RuntimeJobV2 + from qiskit_ibm_runtime import RuntimeJobV2, Session from mpqp.execution.simulated_devices import StaticIBMSimulatedDevice def run_ibm(job: Job) -> Result: + # TODO: update docs """Executes the job on the right IBM Q device precised in the job in parameter. @@ -59,7 +61,13 @@ def run_ibm(job: Job) -> Result: This function is not meant to be used directly, please use :func:`~mpqp.execution.runner.run` instead. """ - return run_aer(job) if not job.device.is_remote() else run_remote_ibm(job) + if not job.device.is_remote(): + return run_aer(job) + + if job.mode == ExecutionMode.SESSION: + return run_remote_ibm_session(job) + + return run_remote_ibm(job) def compute_expectation_value( @@ -99,11 +107,9 @@ def compute_expectation_value( nb_shots = job.measure.shots qiskit_observables: list[SparsePauliOp] = [] + job.measure.pre_transpile_observables(job.device) for obs in job.measure.observables: - if obs.pre_transpiled is None: - translated = obs.to_other_language(Language.QISKIT) - else: - translated = obs.pre_transpiled + translated = obs.pre_transpile[job.device] if TYPE_CHECKING: assert isinstance(translated, SparsePauliOp) qiskit_observables.append(translated) @@ -444,6 +450,7 @@ def run_aer(job: Job): job_circuit = job.circuit if TYPE_CHECKING: assert isinstance(job.device, (IBMDevice, StaticIBMSimulatedDevice)) + if isinstance(job.device, StaticIBMSimulatedDevice): if len(job.circuit.noises) != 0: warnings.warn( @@ -454,30 +461,25 @@ def run_aer(job: Job): # (grab qiskit NoiseModel from AerSimulator generated below, and add # to it directly) backend_sim = job.device.to_noisy_simulator() + job_circuit = job.circuit.transpiled_for_device(job.device) + elif len(job.circuit.noises) != 0: - if job.circuit.transpiled_circuit is not None: - if job.circuit.transpiled_noise_model is None: - raise InstructionParsingError( - "transpiled_noise_model is not initialized" - ) - backend_sim = AerSimulator( - method=job.device.value, noise_model=job.circuit.transpiled_noise_model - ) - else: - noise_model, modified_circuit = generate_qiskit_noise_model(job.circuit) - job_circuit = modified_circuit - backend_sim = AerSimulator(method=job.device.value, noise_model=noise_model) - else: - backend_sim = AerSimulator(method=job.device.value) + job_circuit = job.circuit.transpiled_for_device(job.device) - if job.circuit.transpiled_circuit is None: - qiskit_circuit = job_circuit.to_other_device( - job.device, backend_sim=backend_sim + if job.circuit.transpiled_noise_model is None: + raise InstructionParsingError("transpiled_noise_model is not initialized") + backend_sim = AerSimulator( + method=job.device.value, noise_model=job.circuit.transpiled_noise_model ) + else: - qiskit_circuit = job.circuit.transpiled_circuit - if TYPE_CHECKING: - assert isinstance(qiskit_circuit, QuantumCircuit) + job_circuit = job.circuit.transpiled_for_device(job.device) + backend_sim = AerSimulator(method=job.device.value) + + qiskit_circuit = job_circuit + if TYPE_CHECKING: + assert isinstance(qiskit_circuit, QuantumCircuit) + if job.job_type == JobType.STATE_VECTOR: # the save_statevector method is patched on qiskit_aer load, meaning # the type checker can't find it. I hate it but it is what it is. @@ -494,9 +496,7 @@ def run_aer(job: Job): elif job.job_type == JobType.SAMPLE: if TYPE_CHECKING: assert job.measure is not None - job.status = JobStatus.RUNNING - job_sim = backend_sim.run(qiskit_circuit, shots=job.measure.shots) result_sim = job_sim.result() if TYPE_CHECKING: @@ -513,7 +513,10 @@ def run_aer(job: Job): return result -def submit_remote_ibm(job: Job) -> tuple[str, "RuntimeJobV2"]: +def _submit_remote_ibm( + job: Job, runtime_target: Union[BackendV2, Session] +) -> tuple[str, "RuntimeJobV2"]: + # TODO: rewrite docs if needed """Submits the job on the remote IBM device (quantum computer or simulator). Args: @@ -527,48 +530,40 @@ def submit_remote_ibm(job: Job) -> tuple[str, "RuntimeJobV2"]: :func:`~mpqp.execution.runner.run` instead. """ from qiskit import QuantumCircuit + from qiskit.quantum_info import SparsePauliOp from qiskit_ibm_runtime import EstimatorV2 as Runtime_Estimator from qiskit_ibm_runtime import SamplerV2 as Runtime_Sampler - from qiskit_ibm_runtime import Session meas = job.measure - check_job_compatibility(job) - service = get_QiskitRuntimeService() - if TYPE_CHECKING: - assert isinstance(job.device, IBMDevice) - backend = get_backend(job.device) - job.device = IBMDevice(backend.name) - session = Session(service=service, backend=backend) - - if job.circuit.transpiled_circuit is None: - qiskit_circ = job.circuit.to_other_device(job.device) - else: - qiskit_circ = job.circuit.transpiled_circuit + circuit = job.circuit + if job.values is not None: + circuit = circuit.bind_parameters(job.device, job.values) + qiskit_circ = job.circuit.transpiled_for_device(job.device) if TYPE_CHECKING: assert isinstance(qiskit_circ, QuantumCircuit) if job.job_type == JobType.OBSERVABLE: if TYPE_CHECKING: assert isinstance(meas, ExpectationMeasure) - estimator = Runtime_Estimator(mode=session) - qiskit_observables = [ - ( - obs.to_other_language(Language.QISKIT) - if obs.pre_transpiled is None - else obs.pre_transpiled - ) - for obs in meas.observables - ] + + estimator = Runtime_Estimator(mode=runtime_target) + + meas.pre_transpile_observables(job.device) + + qiskit_observables: list[SparsePauliOp] = [] + + for obs in meas.observables: + translated = obs.pre_transpile[job.device] + if TYPE_CHECKING: + assert isinstance(translated, SparsePauliOp) + qiskit_observables.append(translated.apply_layout(qiskit_circ.layout)) + if TYPE_CHECKING: assert all(isinstance(obs, SparsePauliOp) for obs in qiskit_observables) - qiskit_observables = [ - obs.apply_layout(qiskit_circ.layout) for obs in qiskit_observables - ] - # We have to disable all the twirling options and set manually the number of circuits and shots per circuits twirling = getattr(estimator.options, "twirling", None) if twirling is not None: @@ -578,14 +573,14 @@ def submit_remote_ibm(job: Job) -> tuple[str, "RuntimeJobV2"]: twirling.shots_per_randomization = meas.shots setattr(estimator.options, "default_shots", meas.shots) - ibm_job = estimator.run([(qiskit_circ, qiskit_observables)]) elif job.job_type == JobType.SAMPLE: if TYPE_CHECKING: assert isinstance(meas, BasisMeasure) - sampler = Runtime_Sampler(mode=session) + sampler = Runtime_Sampler(mode=runtime_target) ibm_job = sampler.run([qiskit_circ], shots=meas.shots) + else: raise NotImplementedError( f"{job.job_type} not handled by remote remote IBM devices." @@ -596,6 +591,96 @@ def submit_remote_ibm(job: Job) -> tuple[str, "RuntimeJobV2"]: return job.id, ibm_job +def submit_remote_ibm(job: Job) -> tuple[str, "RuntimeJobV2"]: + # TODO: docs + if TYPE_CHECKING: + assert isinstance(job.device, IBMDevice) + backend = get_backend(job.device) + + try: + job.device = IBMDevice(backend.name) + except Exception: + pass + + return _submit_remote_ibm(job, runtime_target=backend) + + +def submit_remote_ibm_batch(jobs: list[Job]) -> tuple[list[str], "RuntimeJobV2"]: + # TODO: docs + if len(jobs) == 0: + raise ValueError( + "Can't submit an IBM batch: job list is empty. " + "Batch execution requires at leat one Job object" + ) + + reference_job = jobs[0] + if TYPE_CHECKING: + assert isinstance(reference_job.device, IBMDevice) + backend = get_backend(reference_job.device) + + from qiskit.quantum_info import SparsePauliOp + from qiskit_ibm_runtime import EstimatorV2 as Runtime_Estimator + + from mpqp.execution.connection.ibm_connection import get_or_create_ibm_session + + execution_target = ( + get_or_create_ibm_session(backend) + if reference_job.mode == ExecutionMode.SESSION + else backend + ) + estimator = Runtime_Estimator(mode=execution_target) + + per_job_circuits: list[QuantumCircuit] = [] + per_job_observables: list[list[SparsePauliOp]] = [] + + for job in jobs: + meas = job.measure + check_job_compatibility(job) + + circuit = job.circuit + if job.values is not None: + circuit = circuit.bind_parameters(job.device, job.values) + + qc = job.circuit.transpiled_for_device(job.device) + if TYPE_CHECKING: + assert isinstance(qc, QuantumCircuit) + + per_job_circuits.append(qc) + + if TYPE_CHECKING: + assert isinstance(meas, ExpectationMeasure) + meas.pre_transpile_observables(job.device) + + obs_list: list[SparsePauliOp] = [] + + for obs in meas.observables: + translated = obs.pre_transpile[job.device] + if TYPE_CHECKING: + assert isinstance(translated, SparsePauliOp) + obs_list.append(translated.apply_layout(qc.layout)) + + if TYPE_CHECKING: + assert all(isinstance(obs, SparsePauliOp) for obs in obs_list) + + per_job_observables.append(obs_list) + + estimator_input = list(zip(per_job_circuits, per_job_observables)) + ibm_job = estimator.run(estimator_input) + + job_ids = [ibm_job.job_id()] * len(jobs) + for job, job_id in zip(jobs, job_ids): + job.id = job_id + + return job_ids, ibm_job + + +def submit_remote_ibm_session( + job: Job, session: "Session" +) -> tuple[str, "RuntimeJobV2"]: + # TODO: docs + return _submit_remote_ibm(job, runtime_target=session) + + def run_remote_ibm(job: Job) -> Result: """Submits the job on the right IBM remote device, precised in the job in parameter, and waits until the job is completed. @@ -618,6 +703,33 @@ def run_remote_ibm(job: Job) -> Result: return extract_result(ibm_result, job, job.device) +def run_remote_ibm_session(job: Job) -> Result: + # TODO: docs + from mpqp.execution.connection.ibm_connection import get_or_create_ibm_session + + if TYPE_CHECKING: + assert isinstance(job.device, IBMDevice) + backend = get_backend(job.device) + session = get_or_create_ibm_session(backend) + + _, remote_job = submit_remote_ibm_session(job, session) + ibm_result = remote_job.result() + return extract_result(ibm_result, job, job.device) + + +def run_remote_ibm_batch(jobs: list[Job]) -> BatchResult: + _, remote_job = submit_remote_ibm_batch(jobs) + ibm_batch_results = remote_job.result() + + mpqp_batch_results = [] + for job, res in zip(jobs, ibm_batch_results): + if TYPE_CHECKING: + assert isinstance(job.device, IBMDevice) + mpqp_batch_results.append(extract_result(res, job, job.device)) + + return BatchResult(mpqp_batch_results) + + def extract_result( result: "QiskitResult | EstimatorResult | PrimitiveResult[PubResult | SamplerPubResult]", job: Optional[Job], diff --git a/mpqp/execution/runner.py b/mpqp/execution/runner.py index 7873eabb..a4d37345 100644 --- a/mpqp/execution/runner.py +++ b/mpqp/execution/runner.py @@ -18,9 +18,9 @@ from __future__ import annotations -from numbers import Complex +from numbers import Complex, Number from textwrap import indent -from typing import TYPE_CHECKING, Iterable, Optional, Sequence, overload +from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Union, overload import numpy as np @@ -39,19 +39,53 @@ GOOGLEDevice, IBMDevice, ) -from mpqp.execution.job import Job, JobStatus, JobType +from mpqp.execution.job import ExecutionMode, Job, JobStatus, JobType from mpqp.execution.providers.atos import run_atos, submit_QLM from mpqp.execution.providers.aws import run_braket, submit_job_braket from mpqp.execution.providers.azure import run_azure, submit_job_azure from mpqp.execution.providers.google import run_google -from mpqp.execution.providers.ibm import run_ibm, submit_remote_ibm from mpqp.execution.result import BatchResult, Result from mpqp.tools.display import state_vector_ket_shape from mpqp.tools.errors import DeviceJobIncompatibleError, RemoteExecutionError from mpqp.tools.generics import OneOrMany, find_index, flatten if TYPE_CHECKING: - from sympy import Expr + from qiskit.circuit import Parameter + from sympy import Basic, Expr + + +ValuesKey = Union["Expr", "Parameter", "Basic", str] +ValuesDict = dict[ValuesKey, Number] +BatchValuesInput = Optional[Union[ValuesDict, Sequence[ValuesDict]]] + + +def prepare_run_batch_inputs( + circuits: list[QCircuit], + values: BatchValuesInput, +) -> tuple[list[QCircuit], list[Optional[ValuesDict]]]: + + # TODO: docs + + if values is None: + return circuits, [None] * len(circuits) + + if isinstance(values, dict): + return circuits, [values] * len(circuits) + values_list = list(values) + + if len(circuits) == 1 and len(values_list) > 1: + return [circuits[0] for _ in range(len(values_list))], list(values_list) + + if len(values_list) == 1 and len(circuits) > 1: + return circuits, [values_list[0]] * len(circuits) + + if len(values_list) == len(circuits): + return circuits, list(values_list) + + raise ValueError( + "In BATCH mode, number of circuits must match number of values dicts " + f"Got {len(circuits)} circuits and {len(values_list)} values sets." + ) def adjust_measure(measure: ExpectationMeasure, circuit: QCircuit): @@ -108,8 +142,10 @@ def adjust_measure(measure: ExpectationMeasure, circuit: QCircuit): def generate_job( circuit: QCircuit, device: AvailableDevice, - values: "Optional[dict[Expr | str, Complex]]" = None, + values: Optional[ValuesDict] = None, + exec_mode: Optional[ExecutionMode] = ExecutionMode.JOB, ) -> Job: + # TODO: docstring """Creates the Job of appropriate type and containing the information needed for the execution of the circuit. @@ -121,25 +157,40 @@ def generate_job( circuit: Circuit to be run. device: Device on which the circuit will be run. values: Set of values to substitute for symbolic variables. + exec_mode: Returns: The Job containing information about the execution of the circuit. """ - if values is not None: - circuit = circuit.subs(values, True) + if values is not None and not device.is_remote(): + from sympy import Expr + + subs_values: dict[Expr | str, Complex] = {} + for k, v in values.items(): + if isinstance(k, (str, Expr)): + if not isinstance(v, Complex): + raise TypeError( + f"Parameter binding requires numeric values; got {type(v).__name__}." + ) + subs_values[k] = v + + circuit = circuit.subs(subs_values, True) m_list = circuit.measurements nb_meas = len(m_list) if nb_meas == 0: - job = Job(JobType.STATE_VECTOR, circuit, device) + job = Job(JobType.STATE_VECTOR, circuit, device, exec_mode) + elif nb_meas == 1: measurement = m_list[0] if isinstance(measurement, BasisMeasure): - if measurement.shots <= 0: - job = Job(JobType.STATE_VECTOR, circuit, device) - else: - job = Job(JobType.SAMPLE, circuit, device) + job = ( + Job(JobType.STATE_VECTOR, circuit, device, exec_mode) + if measurement.shots <= 0 + else Job(JobType.SAMPLE, circuit, device, exec_mode) + ) + elif isinstance(measurement, ExpectationMeasure): m = adjust_measure(measurement, circuit) c = circuit.without_measurements(deep_copy=False) @@ -148,7 +199,9 @@ def generate_job( JobType.OBSERVABLE, c, device, + exec_mode, ) + else: raise NotImplementedError( f"Measurement type {type(measurement)} not handled" @@ -159,6 +212,9 @@ def generate_job( "circuit." ) + if values is not None and device.is_remote(): + job.values = values + return job @@ -167,13 +223,14 @@ def _run_diagonal_observables( exp_measure: ExpectationMeasure, device: AvailableDevice, observable_job: Job, - values: "Optional[dict[Expr | str, Complex]]" = None, + values: Optional[ValuesDict] = None, + mode: Optional[ExecutionMode] = ExecutionMode.JOB, ) -> Result: adapted_circuit = circuit.without_measurements(deep_copy=False) adapted_circuit.add(BasisMeasure(exp_measure.targets, shots=exp_measure.shots)) - result = _run_single(adapted_circuit, device, values, False) + result = _run_single(adapted_circuit, device, values, False, mode) probas = result.probabilities error = 0 if exp_measure.shots == 0 else None @@ -204,9 +261,12 @@ def _run_diagonal_observables( def _run_single( circuit: QCircuit, device: AvailableDevice, - values: "Optional[dict[Expr | str, Complex]]" = None, + values: Optional[ValuesDict] = None, display_breakpoints: bool = True, + mode: Optional[ExecutionMode] = ExecutionMode.JOB, + reservation_arn: Optional[str] = None, ) -> Result: + # TODO: docstring + replace reservation_arn by dict for provider specific options """Runs the circuit on the ``backend``. If the circuit depends on variables, the ``values`` given in parameters are used to do the substitution. @@ -249,13 +309,16 @@ def _run_single( for k in range(len(circuit.breakpoints)): display_kth_breakpoint(circuit, k, device) - job = generate_job(circuit, device, values) + job = generate_job(circuit, device, values, mode) job.status = JobStatus.INIT + if len(circuit.measurements) == 1: measure = circuit.measurements[0] if isinstance(measure, ExpectationMeasure): if measure.optim_diagonal and measure.only_diagonal_observables(): - return _run_diagonal_observables(circuit, measure, device, job, values) + return _run_diagonal_observables( + circuit, measure, device, job, values, mode + ) if len(circuit.noises) != 0: if not device.is_noisy_simulator(): @@ -268,15 +331,23 @@ def _run_single( raise NotImplementedError(f"Noisy simulations not supported on {device}.") if isinstance(device, (IBMDevice, StaticIBMSimulatedDevice)): + from mpqp.execution.providers.ibm import run_ibm, run_remote_ibm_batch + + if job.mode == ExecutionMode.BATCH and device.is_remote(): + batch_results = run_remote_ibm_batch([job]) + return batch_results[0] + return run_ibm(job) + elif isinstance(device, ATOSDevice): return run_atos(job) elif isinstance(device, AWSDevice): - return run_braket(job) + return run_braket(job, reservation_arn=reservation_arn) elif isinstance(device, GOOGLEDevice): return run_google(job) elif isinstance(device, AZUREDevice): return run_azure(job) + else: raise NotImplementedError(f"Device {device} not handled") @@ -285,17 +356,26 @@ def _run_single( def run( circuit: OneOrMany[QCircuit], device: Sequence[AvailableDevice], - values: "Optional[dict[Expr | str, Complex]]" = None, + values: BatchValuesInput = None, display_breakpoints: bool = True, + reservation_arn: Optional[str] = None, + mode: Optional[ExecutionMode] = None, + values_batch: Optional[list[ValuesDict]] = None, ) -> BatchResult: ... +# TODO: why using values and values_batch at the same time + + @overload def run( circuit: Sequence[QCircuit], device: OneOrMany[AvailableDevice], - values: "Optional[dict[Expr | str, Complex]]" = None, + values: Optional[ValuesDict] = None, display_breakpoints: bool = True, + reservation_arn: Optional[str] = None, + mode: Optional[ExecutionMode] = None, + values_batch: Optional[list[ValuesDict]] = None, ) -> BatchResult: ... @@ -303,16 +383,22 @@ def run( def run( circuit: QCircuit, device: AvailableDevice, - values: "Optional[dict[Expr | str, Complex]]" = None, + values: Optional[ValuesDict] = None, display_breakpoints: bool = True, + reservation_arn: Optional[str] = None, + mode: Optional[ExecutionMode] = None, + values_batch: Optional[list[ValuesDict]] = None, ) -> Result: ... def run( circuit: OneOrMany[QCircuit], device: OneOrMany[AvailableDevice], - values: "Optional[dict[Expr | str, Complex]]" = None, + values: BatchValuesInput = None, display_breakpoints: bool = True, + reservation_arn: Optional[str] = None, + mode: Optional[ExecutionMode] = None, + values_batch: Optional[list[ValuesDict]] = None, ) -> Result | BatchResult: """Runs the circuit on the backend, or list of backend, provided in parameter. @@ -389,19 +475,64 @@ def namer(circ: QCircuit, i: int): circ.label = f"circuit {i}" if circ.label is None else circ.label return circ - if isinstance(circuit, Iterable) or isinstance(device, Iterable): - return BatchResult( - [ - _run_single( - namer(circ, i + 1), - dev, - values, - display_breakpoints, + circuits = [circuit] if isinstance(circuit, QCircuit) else list(circuit) + devices = [device] if isinstance(device, AvailableDevice) else list(device) + + exec_mode = mode or ExecutionMode.JOB + + if values_batch is not None and exec_mode != ExecutionMode.BATCH: + raise ValueError("values_batch is only supported when mode == VQAMode.BATCH") + + if exec_mode == ExecutionMode.BATCH: + if len(devices) != 1: + raise ValueError( + "Batch mode is only defined for a single backend, but got " + f"{len(devices)} devices." + ) + + if values_batch is not None and len(values_batch) != len(circuits): + raise ValueError("values_batch must have the same length as circuits.") + + target_device = devices[0] + per_run_circuits, per_run_values = prepare_run_batch_inputs(circuits, values) + + jobs = [] + for i, circ in enumerate(per_run_circuits): + jobs.append( + generate_job( + namer(circ, i + 1), target_device, per_run_values[i], exec_mode ) - for i, circ in enumerate(flatten(circuit)) - for dev in flatten(device) - ] - ) + ) + + # TODO: batch only supported for IBM ? maybe raise an error otherwise. + # And why only observable here ? + if isinstance(target_device, IBMDevice) and target_device.is_remote(): + from mpqp.execution.providers.ibm import run_remote_ibm_batch + + for job in jobs: + if job.job_type != JobType.OBSERVABLE: + raise ValueError( + "IBM batch execution supports only observable jobs " + f"(found {job.job_type} in circuit '{job.circuit.label}')." + ) + return run_remote_ibm_batch(jobs) + + if isinstance(circuit, Iterable) or isinstance(device, Iterable): + return BatchResult( + [ + _run_single( + namer(circ, i + 1), + dev, + values, + display_breakpoints, + ) + for i, circ in enumerate(flatten(circuit)) + for dev in flatten(device) + ] + ) + + # TODO : remark, remove weird management of multi circuit and multi device, it was already done in a more + # compact way else: return _run_single(circuit, device, values, display_breakpoints) @@ -409,8 +540,11 @@ def namer(circ: QCircuit, i: int): def submit( circuit: QCircuit, device: AvailableDevice, - values: Optional[dict[Expr | str, Complex]] = None, + values: Optional[ValuesDict] = None, + mode: Optional[ExecutionMode] = None, + reservation_arn: Optional[str] = None, ) -> tuple[str, Job]: + # TODO replace reservation_arn + docstring """Submit the job related to the circuit on the remote backend provided in parameter. The submission returns a ``job_id`` that can be used to retrieve the :class:`~mpqp.execution.result.Result` later using the @@ -449,15 +583,31 @@ def submit( "submit(...) function is only made for remote device." ) - job = generate_job(circuit, device, values) + job = generate_job(circuit, device, values, mode) job.status = JobStatus.INIT if isinstance(device, IBMDevice): - job_id, _ = submit_remote_ibm(job) + # TODO: we said that provider specific stuff should only go into the provider specific execution file , + # here ibm.py, to keep the logic simple on runner.py + if mode == ExecutionMode.SESSION: + from mpqp.execution.connection.ibm_connection import ( + get_backend, + get_or_create_ibm_session, + ) + from mpqp.execution.providers.ibm import submit_remote_ibm_session + + backend = get_backend(device) + session = get_or_create_ibm_session(backend) + job_id, _ = submit_remote_ibm_session(job, session) + else: + from mpqp.execution.providers.ibm import submit_remote_ibm + + job_id, _ = submit_remote_ibm(job) + elif isinstance(device, ATOSDevice): job_id, _ = submit_QLM(job) elif isinstance(device, AWSDevice): - job_id, _ = submit_job_braket(job) + job_id, _ = submit_job_braket(job, reservation_arn=reservation_arn) elif isinstance(device, AZUREDevice): job_id, _ = submit_job_azure(job) else: diff --git a/mpqp/execution/vqa/optimizer.py b/mpqp/execution/vqa/optimizer.py index 028857c2..e069dbdd 100644 --- a/mpqp/execution/vqa/optimizer.py +++ b/mpqp/execution/vqa/optimizer.py @@ -2,14 +2,84 @@ :class:`Optimizer` enum lists all the methods validated with the rest of the library.""" -from enum import Enum, auto +from enum import Enum +from functools import partial +from typing import Any, Callable, Optional, Sequence, Union + +import numpy as np +import numpy.typing as npt +from scipy.optimize import OptimizeResult +from scipy.optimize import minimize as scipy_minimize + +OptimizerInput = Union[list[float], npt.NDArray[np.float_]] +OptimizableFunc = Union[partial[float], Callable[[OptimizerInput], float]] +OptimizerOptions = dict[str, Any] +# OptimizerCallback = Callable[[OptimizerInput, float], None] class Optimizer(Enum): """Enum used to select the optimizer for the VQA.""" - BFGS = auto() - COBYLA = auto() - CMAES = auto() - POWELL = auto() - # NELDER-MEAD = auto() + BFGS = "BFGS" + L_BFGS_B = "L-BFGS-B" + COBYLA = "COBYLA" + POWELL = "POWELL" + NELDER_MEAD = "Nelder-Mead" + SLSQP = "SLSQP" + + CMAES = "CMAES" + + +def run_optimizer( + eval_func: OptimizableFunc, + method: Optimizer, + init_params: OptimizerInput, + optimizer_options: Optional[OptimizerOptions] = None, + callback: Optional[Callable[[OptimizerInput], None]] = None, + batch_eval: Optional[ + Callable[[Sequence[npt.NDArray[np.float_]]], Sequence[float]] + ] = None, +) -> tuple[float, npt.NDArray[np.float_]]: + + if optimizer_options is None: + optimizer_options = {} + + x0 = np.asarray(init_params, dtype=float) + + if method == Optimizer.CMAES: + import cma + + sigma0 = float(optimizer_options.pop("sigma0", 0.5)) + es = cma.CMAEvolutionStrategy([float(x) for x in x0], sigma0, optimizer_options) + + best_value = float("inf") + best_params = np.asarray(x0, dtype=float) + + while not es.stop(): + solutions = es.ask() + + if batch_eval is not None: + candidates = [np.asarray(x, dtype=float) for x in solutions] + fit = [float(v) for v in batch_eval(candidates)] + else: + fit = [float(eval_func(np.asarray(x, dtype=float))) for x in solutions] + + es.tell(solutions, fit) + es.disp() + + if callback is not None: + callback(np.asarray(es.best.x, dtype=float)) + + return float(best_value), np.asarray(best_params, dtype=float) + + result: OptimizeResult = scipy_minimize( + eval_func, + x0=x0, + method=method.value, + options=optimizer_options, + callback=callback, + ) + best_value = float(result.fun) + best_params = np.asarray(result.x, dtype=float) + + return best_value, best_params diff --git a/mpqp/execution/vqa/vqa.py b/mpqp/execution/vqa/vqa.py index b9c9719f..592dc346 100644 --- a/mpqp/execution/vqa/vqa.py +++ b/mpqp/execution/vqa/vqa.py @@ -1,7 +1,7 @@ from __future__ import annotations from functools import partial -from typing import TYPE_CHECKING, Any, Callable, Collection, Optional, TypeVar, Union +from typing import Any, Callable, Collection, Optional, Sequence, TypeVar, Union import numpy as np import numpy.typing as npt @@ -14,8 +14,9 @@ from mpqp.core.circuit import QCircuit from mpqp.core.instruction import ExpectationMeasure from mpqp.execution.devices import AvailableDevice +from mpqp.execution.job import ExecutionMode from mpqp.execution.runner import run -from mpqp.execution.vqa.optimizer import Optimizer +from mpqp.execution.vqa.optimizer import Optimizer, run_optimizer T1 = TypeVar("T1") T2 = TypeVar("T2") @@ -41,7 +42,7 @@ def _maps(l1: Collection[T1], l2: Collection[T2]) -> dict[T1, T2]: """Does like zip, but with a dictionary instead of a list of tuples""" if len(l1) != len(l2): - ValueError( + raise ValueError( f"Length of the two collections are not equal ({len(l1)} and {len(l2)})." ) return {e1: e2 for e1, e2 in zip(l1, l2)} @@ -55,6 +56,7 @@ def minimize( nb_params: Optional[int] = None, optimizer_options: Optional[dict[str, Any]] = None, callback: Optional[OptimizerCallback] = None, + mode: ExecutionMode = ExecutionMode.JOB, ) -> tuple[float, OptimizerInput]: """This function runs an optimization on the parameters of the circuit, in order to minimize the measured expectation value of observables associated with the given circuit. @@ -122,30 +124,19 @@ def minimize( (8.881784197001252e-16, array([0., 0.])) """ - if isinstance(optimizable, QCircuit): - if device is None: - raise ValueError("A device is needed to optimize a circuit") - optimizer = _minimize_remote if device.is_remote() else _minimize_local - return optimizer( - optimizable, - method, - device, - init_params, - nb_params, - optimizer_options, - callback, - ) - else: - # TODO: find a way to know if the job is remote or local from the function - return _minimize_local( - optimizable, - method, - device, - init_params, - nb_params, - optimizer_options, - callback, - ) + if device is None: + raise ValueError("A device is needed to optimize a circuit") + optimizer = _minimize_remote if device.is_remote() else _minimize_local + return optimizer( + optimizable, + method, + device, + init_params, + nb_params, + optimizer_options, + callback, + mode, + ) def _minimize_remote( @@ -156,6 +147,7 @@ def _minimize_remote( nb_params: Optional[int] = None, optimizer_options: Optional[dict[str, Any]] = None, callback: Optional[OptimizerCallback] = None, + mode: Optional[ExecutionMode] = ExecutionMode.JOB, ) -> tuple[float, OptimizerInput]: """This function runs an optimization on the parameters of the circuit, to minimize the expectation value of the measure of the circuit by it's @@ -199,6 +191,7 @@ def _minimize_local( nb_params: Optional[int] = None, optimizer_options: Optional[dict[str, Any]] = None, callback: Optional[OptimizerCallback] = None, + mode: Optional[ExecutionMode] = ExecutionMode.JOB, ) -> tuple[float, OptimizerInput]: """This function runs an optimization on the parameters of the circuit, to minimize the expectation value of the measure of the circuit by it's @@ -233,11 +226,23 @@ def _minimize_local( if device is None: raise ValueError("A device is needed to optimize a circuit") return _minimize_local_circ( - optimizable, device, method, init_params, optimizer_options, callback + optimizable, + device, + method, + init_params, + optimizer_options, + callback, + mode, ) else: return _minimize_local_func( - optimizable, method, init_params, nb_params, optimizer_options, callback + optimizable, + method, + init_params, + nb_params, + optimizer_options, + callback, + mode, ) @@ -274,6 +279,7 @@ def _minimize_local_circ( Returns: The optimal value reached and the parameters used to reach this value. """ + # TODO: rework all this, minimize_local shouldn't have been touched at all in principle, only the remote one # The sympy `free_symbols` method returns in fact sets of Basic, which # are theoretically different from Expr, but in our case the difference # is not relevant. @@ -284,30 +290,70 @@ def _minimize_local_circ( if not isinstance(circ.measurements[0], ExpectationMeasure): raise ValueError("Expected an ExpectationMeasure to optimize the circuit.") - else: - if len(circ.measurements[0].observables) > 1: - raise ValueError( - "Expected only one observable in the ExpectationMeasure but got" - f" {len(circ.measurements[0].observables)}" - ) - - def eval_circ(params: OptimizerInput): - # pyright is bad with abstract numeric types: - # "float" is incompatible with "Complex" - from numbers import Complex - params_fixed_type: Collection[Complex] = ( - params # pyright: ignore[reportAssignmentType] + if len(circ.measurements[0].observables) > 1: + raise ValueError( + "Expected only one observable in the ExpectationMeasure but got" + f" {len(circ.measurements[0].observables)}" ) - values: dict[Expr | str, Complex] = _maps(variables, params_fixed_type) - result = run(circ, device, values) - if TYPE_CHECKING: - assert isinstance(result.expectation_values, float) - return result.expectation_values + variables = sorted(circ.variables(), key=str) + + exec_mode = mode or ExecutionMode.JOB + if exec_mode == ExecutionMode.BATCH and not ( + isinstance(method, Optimizer) and method == Optimizer.CMAES + ): + raise ValueError("Batch mode is supported with CMAES optimizer ") + + single_mode = ( + ExecutionMode.SESSION + if exec_mode == ExecutionMode.SESSION + else ExecutionMode.JOB + ) + + def eval_circ(params: OptimizerInput) -> float: + params_fixed = [complex(x) for x in params] + values = _maps(variables, params_fixed) + res = run(circ, device, values, mode=single_mode) + return float(res.expectation_values) + + batch_eval_fn: Optional[ + Callable[[Sequence[npt.NDArray[np.float_]]], Sequence[float]] + ] = None + + if ( + isinstance(method, Optimizer) + and method == Optimizer.CMAES + and exec_mode == ExecutionMode.BATCH + ): + + def _batch_eval( + candidates: Sequence[npt.NDArray[np.float_]], + ) -> Sequence[float]: + values_list = [ + _maps(variables, [complex(float(x)) for x in cand]) + for cand in candidates + ] + batch_res = run( + circ, + device, + values=values_list, + mode=ExecutionMode.BATCH, + display_breakpoints=False, + ) + + return [float(res.expectation_values) for res in batch_res] + + batch_eval_fn = _batch_eval return _minimize_local_func( - eval_circ, method, init_params, len(variables), optimizer_options, callback + eval_circ, + method, + init_params, + len(variables), + optimizer_options, + callback, + batch_eval=batch_eval_fn, ) @@ -318,7 +364,11 @@ def _minimize_local_func( nb_params: Optional[int] = None, optimizer_options: Optional[OptimizerOptions] = None, callback: Optional[OptimizerCallback] = None, + batch_eval: Optional[ + Callable[[Sequence[npt.NDArray[np.float_]]], Sequence[float]] + ] = None, ) -> tuple[float, OptimizerInput]: + # TODO: rework all this, minimize_local shouldn't have been touched at all in principle, only the remote one """This function runs an optimization on the parameters of the circuit, to minimize the expectation value of the measure of the circuit by it's observables. Note that this means that the circuit should contain an @@ -356,14 +406,30 @@ def _minimize_local_func( else: init_params = [0.0] * nb_params + def _optimizer_callback(x: OptimizerInput) -> None: + if callback is None: + return + + if isinstance(x, OptimizeResult): + callback(x) + return + + try: + callback(OptimizeResult(x=np.array(x, dtype=float))) + except Exception: + callback(x) + if isinstance(method, Optimizer): - res: OptimizeResult = scipy_minimize( + best_value, best_params = run_optimizer( eval_func, - x0=np.array(init_params), - method=method.name.lower(), - options=optimizer_options, - callback=callback, + method, + init_params, + optimizer_options, + _optimizer_callback if callback is not None else None, + batch_eval=batch_eval, ) - return float(res.fun), res.x - else: - return method(eval_func, init_params, optimizer_options) + return best_value, best_params + + best_value, best_params = method(eval_func, init_params, optimizer_options) + + return float(best_value), np.asarray(best_params, dtype=float) diff --git a/tests/execution/test_validity.py b/tests/execution/test_validity.py index 0cd1e3e8..7ff73bc9 100644 --- a/tests/execution/test_validity.py +++ b/tests/execution/test_validity.py @@ -39,9 +39,7 @@ from mpqp.noise.noise_model import NOISE_MODELS from mpqp.tools import Matrix, atol, rand_hermitian_matrix, rtol from mpqp.tools.circuit import random_gate, random_noise -from mpqp.tools.errors import ( - DeviceJobIncompatibleError, -) +from mpqp.tools.errors import DeviceJobIncompatibleError from mpqp.tools.maths import matrix_eq pi = np.pi