Source code for qiskit_qulacs.qulacs_job
"""Qulacs Job"""
from qiskit.providers import JobError, JobStatus
from qiskit.providers import JobV1 as Job
from .backend_utils import DEFAULT_EXECUTOR, requires_submit
[docs]
class QulacsJob(Job):
"""Qulacs Job"""
def __init__(
self,
backend,
job_id,
fn,
circuits,
states,
run_options=None,
executor=None,
) -> None:
super().__init__(backend, job_id)
self._fn = fn
self._circuits = circuits
self._states = states
self._run_options = run_options
self._executor = executor or DEFAULT_EXECUTOR
self._future = None
[docs]
def submit(self):
"""Submit the job to the backend for execution."""
if self._future is not None:
raise JobError("Qulacs job has already been submitted.")
self._future = self._executor.submit(
self._fn,
self._circuits,
self._states,
self._run_options,
self._job_id,
)
[docs]
@requires_submit
def result(self, timeout=None):
"""Get job result. The behavior is the same as the underlying
concurrent Future objects,
"""
return self._future.result(timeout=timeout)
[docs]
@requires_submit
def cancel(self):
"""Attempt to cancel the job."""
return self._future.cancel()
[docs]
@requires_submit
def status(self):
"""Gets the status of the job by querying the Python's future"""
# The order is important here
if self._future.running():
_status = JobStatus.RUNNING
elif self._future.cancelled():
_status = JobStatus.CANCELLED
elif self._future.done():
_status = (
JobStatus.DONE if self._future.exception() is None else JobStatus.ERROR
)
else:
_status = JobStatus.INITIALIZING
return _status
[docs]
def backend(self):
"""Return the instance of the backend used for this job."""
return self._backend
[docs]
def circuits(self):
"""Return the list of QuantumCircuit submitted for this job.
Returns:
list of QuantumCircuit: the list of QuantumCircuit submitted for this job.
"""
return self._circuits