import signal
import time
import datetime
import asyncio
from decimal import Decimal
from enum import Enum
from typing import List, Optional, Dict, Union
from collections import defaultdict
import django.db.transaction
from django.core.management import BaseCommand
from django.db.models import Q
from stellar_sdk import (
Keypair,
ServerAsync,
MuxedAccount,
TransactionEnvelope,
CreateAccount,
CreateClaimableBalance,
)
from stellar_sdk.client.aiohttp_client import AiohttpClient
from stellar_sdk.exceptions import ConnectionError
from asgiref.sync import sync_to_async
from polaris import settings
from polaris.utils import (
is_pending_trust,
maybe_make_callback,
maybe_make_callback_async,
get_account_obj_async,
)
from polaris.integrations import (
registered_deposit_integration as rdi,
registered_rails_integration as rri,
registered_custody_integration as rci,
registered_fee_func,
calculate_fee,
)
from polaris.exceptions import (
TransactionSubmissionPending,
TransactionSubmissionBlocked,
TransactionSubmissionFailed,
)
from polaris.models import Transaction, PolarisHeartbeat
from polaris.utils import getLogger
logger = getLogger(__name__)
class TransactionType(Enum):
DEPOSIT = 1
CREATE_ACCOUNT = 2
SUBMIT_TRANSACTION_QUEUE = "SUBMIT_TRANSACTION_QUEUE"
DEFAULT_HEARTBEAT = 5
DEFAULT_INTERVAL = 10
RECOVER_LOCK_LOWER_BOUND = 30
PROCESS_PENDING_DEPOSITS_LOCK_KEY = "PROCESS_PENDING_DEPOSITS_LOCK"
class PolarisQueueAdapter:
def __init__(self, queues):
self.queues: Dict[str, asyncio.Queue] = {}
for queue in queues:
self.queues[queue] = asyncio.Queue()
def populate_queues(self):
"""
populate_queues gets called to read from the database and populate the in-memory queues
"""
logger.debug("initializing queues from database...")
ready_transactions = (
Transaction.objects.filter(
queue=SUBMIT_TRANSACTION_QUEUE,
submission_status__in=[
Transaction.SUBMISSION_STATUS.ready,
Transaction.SUBMISSION_STATUS.processing,
],
kind__in=[
Transaction.KIND.deposit,
getattr(Transaction.KIND, "deposit-exchange"),
],
queued_at__isnull=False,
)
.order_by("queued_at")
.select_related("asset")
)
logger.debug(
f"found {len(ready_transactions)} transactions to queue for submit_transaction_task"
)
for transaction in ready_transactions:
self.queue_transaction(
"populate_queues", SUBMIT_TRANSACTION_QUEUE, transaction
)
def queue_transaction(self, source_task_name, queue_name, transaction):
"""
Put the given transaction into a queue
@param: source_task_name - the task that queued this transaction
@param: queue_name - name of the queue to put the Transaction in
@param: transaction - the Transaction to put in the queue
"""
logger.debug(
f"{source_task_name} - putting transaction {transaction.id} into {queue_name}"
)
self.queues[queue_name].put_nowait(transaction)
async def get_transaction(self, source_task_name, queue_name) -> Transaction:
"""
Consume a transaction from a queue
@param: source_task_name - the task that is requesting a Transaction
@param: queue_name - name of the queue to consume the Transaction from
"""
logger.debug(f"{source_task_name} requesting task from queue: {queue_name}")
transaction = await self.queues[queue_name].get()
logger.debug(f"{source_task_name} got transaction: {transaction}")
return transaction
class ProcessPendingDeposits:
@classmethod
async def check_rails_task(
cls, queues: PolarisQueueAdapter, interval
): # pragma: no cover
"""
Periodically poll for deposit transactions that are ready to be processed
and submit them to the CHECK_ACC_QUEUE for verification by the check_accounts_task.
"""
logger.debug("check_rails_task started...")
while True:
await cls.check_rails_for_ready_transactions(queues)
await asyncio.sleep(interval)
@classmethod
async def check_rails_for_ready_transactions(cls, queues: PolarisQueueAdapter):
ready_transactions = await sync_to_async(cls.get_ready_deposits)()
if not rci.account_creation_supported:
Transaction.objects.filter(
id__in=[t.id for t in ready_transactions]
).update(
# TODO
# we don't have an external status that indicates the user or wallet
# needs to fund the account. Placing in pending_user for now.
status=Transaction.STATUS.pending_user,
submission_status=Transaction.SUBMISSION_STATUS.pending_funding,
)
return
await cls.check_accounts(queues, ready_transactions)
@classmethod
async def check_accounts_task(
cls, queues: PolarisQueueAdapter, interval: int
): # pragma: no cover
"""
Periodically polls accounts to determine if they exist on the Stellar
Network. If they do, the transaction is queued for deposit submission,
otherwise the transaction remains in the same state and is polled again
at the provided interval.
This task is only necessary if the registered CustodyIntegration class
does not support account creation.
"""
logger.debug("check_accounts_task started...")
while True:
transactions = await sync_to_async(cls.get_unfunded_account_transactions)()
await cls.check_accounts(queues, transactions)
await asyncio.sleep(interval)
@classmethod
async def check_accounts(
cls, queues: PolarisQueueAdapter, transactions: List[Transaction]
):
async with ServerAsync(settings.HORIZON_URI, client=AiohttpClient()) as server:
for transaction in transactions:
try:
_, account_json = await get_account_obj_async(
Keypair.from_public_key(transaction.to_address), server
)
except RuntimeError:
# account not found, submitting the transaction will take care of account creation
await sync_to_async(cls.save_as_ready_for_submission)(transaction)
queues.queue_transaction(
"check_accounts_task", SUBMIT_TRANSACTION_QUEUE, transaction
)
continue
except ConnectionError:
continue
if (
not is_pending_trust(transaction, account_json)
or transaction.claimable_balance_supported
):
await sync_to_async(cls.save_as_ready_for_submission)(transaction)
queues.queue_transaction(
"check_accounts_task", SUBMIT_TRANSACTION_QUEUE, transaction
)
else:
await sync_to_async(cls.save_as_pending_trust)(transaction)
@staticmethod
def get_unfunded_account_transactions():
return list(
Transaction.objects.filter(
kind__in=[Transaction.KIND.deposit, "deposit-exchange"],
submission_status=Transaction.SUBMISSION_STATUS.pending_funding,
)
.select_related("asset", "quote")
.all()
)
@classmethod
async def check_unblocked_transactions_task(
cls, queues: PolarisQueueAdapter, interval: int
):
"""
Get the transactions that are in a 'unblocked' submission_status and
submit them to the SUBMIT_TRANSACTION_QUEUE for the submit_transactions_task to process.
The 'unblocked' submission_status implies that Polaris preivously saved the
transaction as 'blocked' due to a TransactionSubmissionBlocked exception being
raised by a function that submits transactions to the Stellar Network.
Anchors could manually resolve an issue causing the transaction to enter
the 'blocked' status and update the transaction to be "unblocked", which would allow
Polaris to detect and resubmit it.
"""
logger.debug("check_unblocked_transactions_task started...")
while True:
await cls.process_unblocked_transactions(queues)
await asyncio.sleep(interval)
@classmethod
async def process_unblocked_transactions(cls, queues: PolarisQueueAdapter):
unblocked_transactions = await sync_to_async(cls.get_unblocked_transactions)()
for transaction in unblocked_transactions:
logger.info(
f"check_unblocked_transactions_task - saving transaction {transaction.id} as 'ready'"
)
await sync_to_async(cls.save_as_ready_for_submission)(transaction)
queues.queue_transaction(
"check_unblocked_transactions_task",
SUBMIT_TRANSACTION_QUEUE,
transaction,
)
@staticmethod
def save_as_ready_for_submission(transaction):
logger.debug(f"saving transaction: {transaction.id} as 'ready'")
transaction.queue = SUBMIT_TRANSACTION_QUEUE
transaction.queued_at = datetime.datetime.now(datetime.timezone.utc)
transaction.status = Transaction.STATUS.pending_anchor
transaction.submission_status = Transaction.SUBMISSION_STATUS.ready
transaction.save()
@classmethod
async def check_trustlines_task(
cls, queues: PolarisQueueAdapter, interval: int
): # pragma: no cover
"""
For all transactions that are pending_trust, load the destination
account json to determine if a trustline has been
established. If a trustline for the requested asset is found, a the
transaction is queued for submission.
"""
logger.debug("check_trustlines_task started...")
async with ServerAsync(settings.HORIZON_URI, client=AiohttpClient()) as server:
while True:
await cls.check_trustlines(queues, server)
await asyncio.sleep(interval)
@classmethod
async def check_trustlines(cls, queues: PolarisQueueAdapter, server: ServerAsync):
pending_trust_transactions: List[Transaction] = await sync_to_async(
ProcessPendingDeposits.get_pending_trust_transactions
)()
for transaction in pending_trust_transactions:
if transaction.to_address.startswith("M"):
destination_account = MuxedAccount.from_account(
transaction.to_address
).account_id
else:
destination_account = transaction.to_address
try:
_, account_json = await get_account_obj_async(
Keypair.from_public_key(destination_account), server
)
except ConnectionError:
logger.exception(f"failed to load account {destination_account}")
continue
if is_pending_trust(transaction=transaction, json_resp=account_json):
continue
logger.info(
f"detected transaction {transaction.id} is no longer pending trust"
)
logger.debug(
f"check_trustlines_task - saving transaction {transaction.id} as 'ready'"
)
if transaction.envelope_xdr:
logger.info(
f"clearing submitted envelope_xdr for transaction {transaction.id}, "
f"envelope_xdr: {transaction.envelope_xdr}"
)
transaction.envelope_xdr = None
transaction.stellar_transaction_id = None
await sync_to_async(cls.save_as_ready_for_submission)(transaction)
queues.queue_transaction(
"check_trustlines_task", SUBMIT_TRANSACTION_QUEUE, transaction
)
@classmethod
async def submit_transaction_task(
cls, queues: PolarisQueueAdapter, locks: Dict
): # pragma: no cover
logger.debug("submit_transaction_task - running...")
async with ServerAsync(settings.HORIZON_URI, client=AiohttpClient()) as server:
while True:
transaction = await queues.get_transaction(
"submit_transaction_task", SUBMIT_TRANSACTION_QUEUE
)
await cls.submit_transaction(transaction, server, locks, queues)
@classmethod
async def submit_transaction(
cls,
transaction: Transaction,
server: ServerAsync,
locks: Dict,
queues: PolarisQueueAdapter,
):
attempt = 1
while True:
logger.debug(
f"submit_transaction_task calling submit() for transaction {transaction.id}, "
f"attempt #{attempt}"
)
try:
await ProcessPendingDeposits.submit(transaction, server, locks, queues)
except TransactionSubmissionPending as e:
await sync_to_async(cls.handle_submission_exception)(transaction, e)
attempt += 1
continue
except (TransactionSubmissionBlocked, TransactionSubmissionFailed) as e:
await sync_to_async(cls.handle_submission_exception)(transaction, e)
except Exception as e:
logger.exception("submit() threw an unexpected exception")
message = getattr(e, "message", str(e))
await sync_to_async(ProcessPendingDeposits.handle_error)(
transaction, f"{e.__class__.__name__}: {message}"
)
await maybe_make_callback_async(transaction)
break
@classmethod
def get_ready_deposits(cls) -> List[Transaction]:
"""
Polaris' API server processes deposit request and places the associated Transaction
object in the `pending_user_transfer_start` status when all information necessary to
submit the payment operation on Stellar has been collected.
This function queries for these transaction, in addition to the Transaction objects
that have been identified as pending external rails, and passes them to the
DepositIntegration.poll_pending_deposits() integration function. Anchors return the
transactions that are now available in their off-chain account and therefore ready
for submission to the Stellar Network. Finally, this function performs various
validations to ensure the transaction is truly ready and returns them.
"""
pending_deposits = Transaction.objects.filter(
status__in=[
Transaction.STATUS.pending_user_transfer_start,
Transaction.STATUS.pending_external,
],
kind__in=[
Transaction.KIND.deposit,
getattr(Transaction.KIND, "deposit-exchange"),
],
).select_related("asset", "quote")
ready_transactions = rri.poll_pending_deposits(pending_deposits)
verified_ready_transactions = []
for transaction in ready_transactions:
if transaction.amount_fee is None or transaction.amount_out is None:
if transaction.quote:
logger.error(
f"transaction {transaction.id} uses a quote but was returned "
"from poll_pending_deposits() without amount_fee or amount_out "
"assigned, skipping"
)
continue
logger.warning(
f"transaction {transaction.id} was returned from "
f"poll_pending_deposits() without Transaction.amount_fee or "
f"Transaction.amount_out assigned. Future Polaris "
"releases will not calculate fees and delivered amounts"
)
asset = transaction.asset
quote = transaction.quote
transaction.refresh_from_db()
transaction.asset = asset
transaction.quote = quote
if transaction.kind not in [
transaction.KIND.deposit,
getattr(transaction.KIND, "deposit-exchange"),
]:
cls.handle_error(
transaction,
"poll_pending_deposits() returned a non-deposit transaction",
)
maybe_make_callback(transaction)
continue
if transaction.amount_in is None:
cls.handle_error(
transaction,
"poll_pending_deposits() did not assign a value to the "
"amount_in field of a Transaction object returned",
)
maybe_make_callback(transaction)
continue
elif transaction.amount_fee is None:
if registered_fee_func is calculate_fee:
try:
transaction.amount_fee = calculate_fee(
fee_params={
"amount": transaction.amount_in,
"operation": settings.OPERATION_DEPOSIT,
"asset_code": transaction.asset.code,
}
)
except ValueError:
transaction.amount_fee = Decimal(0)
else:
transaction.amount_fee = Decimal(0)
transaction.save()
verified_ready_transactions.append(transaction)
return verified_ready_transactions
@staticmethod
def get_pending_trust_transactions():
"""
If the destination account does not have a trustline to the requested
asset and the client application that initiated the request does not
support claimable balances, Polaris places the transaction in the
`pending_trust` status.
The returned transactions will be submitted if their destination
accounts now have a trustline to the asset.
"""
return list(
Transaction.objects.filter(
kind__in=[Transaction.KIND.deposit, "deposit-exchange"],
status=Transaction.STATUS.pending_trust,
submission_status=Transaction.SUBMISSION_STATUS.pending_trust,
).select_related("asset", "quote")
)
@staticmethod
def get_unblocked_transactions():
"""
Return transactions that have been put in a SUBMISSION_STATUS.unblocked
state.
"""
unblocked = Q(submission_status=Transaction.SUBMISSION_STATUS.unblocked)
got_signatures = Q(
pending_signatures=False,
envelope_xdr__isnull=False,
status=Transaction.STATUS.pending_anchor,
)
unblocked_transactions = list(
Transaction.objects.filter(
unblocked | got_signatures,
kind__in=[Transaction.KIND.deposit, "deposit-exchange"],
)
.select_related("asset", "quote")
.exclude(
submission_status__in=[
Transaction.SUBMISSION_STATUS.ready,
Transaction.SUBMISSION_STATUS.processing,
]
)
)
for transaction in unblocked_transactions:
logger.info(f"detected unblocked transaction: {transaction.id}")
return unblocked_transactions
@classmethod
async def submit(
cls,
transaction: Transaction,
server: ServerAsync,
locks,
queues: PolarisQueueAdapter,
):
valid_statuses = [
Transaction.STATUS.pending_user_transfer_start,
Transaction.STATUS.pending_external,
Transaction.STATUS.pending_anchor,
Transaction.STATUS.pending_trust,
]
if transaction.status not in valid_statuses:
raise ValueError(
f"Unexpected transaction status: {transaction.status}, expecting "
f"{' or '.join(valid_statuses)}."
)
logger.info(f"initiating submission for {transaction.id}")
transaction.status = Transaction.STATUS.pending_anchor
transaction.submission_status = Transaction.SUBMISSION_STATUS.processing
await sync_to_async(transaction.save)()
await maybe_make_callback_async(transaction)
try:
distribution_account = await sync_to_async(rci.get_distribution_account)(
asset=transaction.asset
)
except NotImplementedError:
# Polaris has to assume that the custody service provider can handle concurrent
# requests to send funds to destination accounts since it does not have a dedicated
# distribution account.
distribution_account = None
else:
# Aquire a lock for the source account of the transaction that will create the
# deposit's destination account.
logger.debug(
f"requesting lock to submit deposit transaction {transaction.id}"
)
await locks["source_accounts"][distribution_account].acquire()
logger.debug(
f"locked to submit deposit transaction for transaction {transaction.id}"
)
try:
try:
_, destination_account_json = await get_account_obj_async(
Keypair.from_public_key(transaction.to_address), server
)
except RuntimeError:
logger.info(
f"destination account: {transaction.to_address} not found, creating account..."
)
transaction_type = TransactionType.CREATE_ACCOUNT
transaction_hash = await sync_to_async(rci.create_destination_account)(
transaction=transaction
)
else:
has_trustline = not is_pending_trust(
transaction, destination_account_json
)
if not has_trustline and not transaction.claimable_balance_supported:
transaction.queue = None
transaction.queued_at = None
await sync_to_async(cls.save_as_pending_trust)(transaction)
if (
distribution_account in locks["source_accounts"]
and locks["source_accounts"][distribution_account].locked()
):
logger.debug(
"unlocking after attempting submission for "
f"transaction {transaction.id}"
)
locks["source_accounts"][distribution_account].release()
return
if transaction.envelope_xdr:
# If this is a multisig distribution account and there are two or more deposit
# transaction for the same destination account two "create_destination_account"
# transactions will be made for the same destination account. We already checked
# if the account exists so if we get to this part of the code and we see another
# create account operation, we clear the envelope_xdr and allow
# submit_deposit_transaction to generate a new transaction envelope.
signed_transaction = TransactionEnvelope.from_xdr(
transaction.envelope_xdr,
network_passphrase=settings.STELLAR_NETWORK_PASSPHRASE,
).transaction
for op in signed_transaction.operations:
if isinstance(op, CreateAccount):
transaction.envelope_xdr = None
await sync_to_async(transaction.save)()
transaction_type = TransactionType.DEPOSIT
transaction_hash = await sync_to_async(rci.submit_deposit_transaction)(
transaction=transaction, has_trustline=has_trustline
)
finally:
if (
distribution_account in locks["source_accounts"]
and locks["source_accounts"][distribution_account].locked()
):
logger.debug(
"unlocking after attempting submission for "
f"transaction {transaction.id}"
)
locks["source_accounts"][distribution_account].release()
transaction_json = (
await server.transactions().transaction(transaction_hash).call()
)
if not transaction_json.get("successful"):
await sync_to_async(cls.handle_error)(
transaction,
"transaction submission failed unexpectedly: "
f"{transaction_json['result_xdr']}",
)
await maybe_make_callback_async(transaction)
else:
await cls.handle_successful_transaction(
transaction_json=transaction_json,
transaction=transaction,
transaction_type=transaction_type,
queues=queues,
)
@classmethod
async def handle_successful_transaction(
cls,
transaction_type: TransactionType,
transaction_json: dict,
transaction: Transaction,
queues: PolarisQueueAdapter,
):
if transaction_type == TransactionType.DEPOSIT:
await cls.handle_successful_deposit(
transaction_json=transaction_json,
transaction=transaction,
)
else:
await cls.handle_successful_account_creation(
transaction=transaction, queues=queues
)
@classmethod
async def handle_successful_deposit(
cls, transaction_json: dict, transaction: Transaction
):
if transaction.claimable_balance_supported and rci.claimable_balances_supported:
transaction.claimable_balance_id = cls.get_balance_id(transaction_json)
transaction.paging_token = transaction_json["paging_token"]
transaction.stellar_transaction_id = transaction_json["id"]
transaction.status = Transaction.STATUS.completed
transaction.submission_status = Transaction.SUBMISSION_STATUS.completed
transaction.completed_at = datetime.datetime.now(datetime.timezone.utc)
transaction.status_message = None
transaction.queue = None
transaction.queued_at = None
if not transaction.quote:
transaction.amount_out = round(
Decimal(transaction.amount_in) - Decimal(transaction.amount_fee),
transaction.asset.significant_decimals,
)
await sync_to_async(transaction.save)()
logger.info(f"transaction {transaction.id} completed.")
await maybe_make_callback_async(transaction)
await sync_to_async(transaction.refresh_from_db)()
try:
await sync_to_async(rdi.after_deposit)(transaction=transaction)
except NotImplementedError:
pass
except Exception:
logger.exception("after_deposit() threw an unexpected exception")
logger.info(f"deposit transaction: {transaction.id} successful")
@classmethod
async def handle_successful_account_creation(
cls, transaction: Transaction, queues: PolarisQueueAdapter
):
logger.info(
f"account: {transaction.to_address} successfully created for transaction: {transaction.id}"
)
if transaction.claimable_balance_supported:
await sync_to_async(cls.save_as_ready_for_submission)(transaction)
queues.queue_transaction(
"submit_transaction_task", SUBMIT_TRANSACTION_QUEUE, transaction
)
else:
transaction.queue = None
transaction.queued_at = None
transaction.status_message = None
await sync_to_async(cls.save_as_pending_trust)(transaction)
@staticmethod
def save_as_pending_trust(transaction: Transaction):
logger.debug(f"saving transaction: {transaction.id} as 'pending_trust'")
transaction.status = Transaction.STATUS.pending_trust
transaction.submission_status = Transaction.SUBMISSION_STATUS.pending_trust
transaction.save()
@staticmethod
def get_balance_id(response: dict) -> Optional[str]:
"""
Pulls claimable balance ID from horizon responses if present
The hex representation of the balanceID is important because it
is the representation required to query and claim claimableBalances.
:param
response: the response from horizon
:return
hex representation of the balanceID or None
"""
envelope = TransactionEnvelope.from_xdr(
response["envelope_xdr"], settings.STELLAR_NETWORK_PASSPHRASE
)
balance_id = None
for idx, op in enumerate(envelope.transaction.operations):
if isinstance(op, CreateClaimableBalance):
balance_id = envelope.transaction.get_claimable_balance_id(idx)
break
return balance_id
@classmethod
def handle_error(cls, transaction, message):
transaction.queue = None
transaction.queued_at = None
transaction.submission_status = Transaction.SUBMISSION_STATUS.failed
transaction.status_message = message
transaction.status = Transaction.STATUS.error
transaction.save()
logger.error(f"transaction: {transaction.id} encountered an error: {message}")
@classmethod
def handle_submission_exception(cls, transaction, exception):
if isinstance(exception, TransactionSubmissionBlocked):
transaction.queue = None
transaction.queued_at = None
transaction.submission_status = Transaction.SUBMISSION_STATUS.blocked
logger.info(f"transaction {transaction.id} is blocked, removing from queue")
elif isinstance(exception, TransactionSubmissionFailed):
transaction.queue = None
transaction.queued_at = None
transaction.status = Transaction.STATUS.error
transaction.submission_status = Transaction.SUBMISSION_STATUS.failed
logger.info(
f"transaction {transaction.id} submission failed, "
f"placing in error status"
)
elif isinstance(exception, TransactionSubmissionPending):
transaction.submission_status = Transaction.SUBMISSION_STATUS.pending
logger.info(f"transaction {transaction.id} is pending, resubmitting")
transaction.status_message = str(exception)
transaction.save()
@classmethod
def update_heartbeat(cls, key):
PolarisHeartbeat.objects.filter(key=key).update(
last_heartbeat=datetime.datetime.now(datetime.timezone.utc)
)
return
@classmethod
async def heartbeat_task(cls, key, heartbeat_interval):
"""
Task that updates the given key at a specified interval (heatbeat_interval)
"""
logger.debug("heartbeat_task started...")
while True:
await sync_to_async(ProcessPendingDeposits.update_heartbeat)(key)
await asyncio.sleep(heartbeat_interval)
@classmethod
def acquire_lock(cls, key: str, heartbeat_interval: Union[int, float]):
"""
This function creates a key in the database table 'polaris_polarisheartbeat'
to ensure only one instance of the calling process runs at a given time. The key
is deleted when the process exists gracefully. A 'heartbeat' is utilized in the event
that the process does not exit gracefully. If the heartbeat's 'last_updated' value is
5x longer than the typical heartbeat interval, it can be assumed that the previous
process that created the key has crashed and a new lock can be acquired
"""
attempt = 1
while True:
logger.debug(
f"attempting to acquire lock on key: {key}, attempt #{attempt}..."
)
with django.db.transaction.atomic():
heartbeat, created = PolarisHeartbeat.objects.get_or_create(key=key)
if created:
# if the heartbeat key was created, update the last_heartbeat field to the current time
heartbeat.last_heartbeat = datetime.datetime.now(
datetime.timezone.utc
)
heartbeat.save()
logger.debug(
f"lock on key: {PROCESS_PENDING_DEPOSITS_LOCK_KEY} created"
)
return
# the heartbeat key already exists (previous process did not shutdown gracefully), attempt
# to acquire the lock based on time elapsed since the last heartbeat
delta = (
datetime.datetime.now(datetime.timezone.utc)
- heartbeat.last_heartbeat
)
logger.debug(f"last heartbeat was {delta.total_seconds()} seconds ago")
# the delta should be 5x the typical interval with a lower bound of 30 seconds
if delta > max(
datetime.timedelta(seconds=heartbeat_interval * 5),
datetime.timedelta(seconds=RECOVER_LOCK_LOWER_BOUND),
):
heartbeat.last_heartbeat = datetime.datetime.now(
datetime.timezone.utc
)
heartbeat.save()
logger.debug(
f"lock on key: {PROCESS_PENDING_DEPOSITS_LOCK_KEY} acquired"
)
return
logger.debug(
f"unable to acquire lock on key: {key}, retrying in {heartbeat_interval} seconds..."
)
attempt += 1
time.sleep(heartbeat_interval)
@classmethod
async def process_pending_deposits( # pragma: no cover
cls, task_interval: int, heartbeat_interval: int
):
current_task = asyncio.current_task()
signal.signal(
signal.SIGINT,
lambda signum, frame: asyncio.create_task(
cls.exit_gracefully(signum, frame, current_task)
),
)
signal.signal(
signal.SIGTERM,
lambda signum, frame: asyncio.create_task(
cls.exit_gracefully(signum, frame, current_task)
),
)
queues = PolarisQueueAdapter([SUBMIT_TRANSACTION_QUEUE])
await sync_to_async(queues.populate_queues)()
locks = {
"source_accounts": defaultdict(asyncio.Lock),
"destination_accounts": defaultdict(asyncio.Lock),
}
try:
await asyncio.gather(
ProcessPendingDeposits.heartbeat_task(
PROCESS_PENDING_DEPOSITS_LOCK_KEY, heartbeat_interval
),
ProcessPendingDeposits.check_rails_task(queues, task_interval),
ProcessPendingDeposits.check_accounts_task(queues, task_interval),
ProcessPendingDeposits.check_trustlines_task(queues, task_interval),
ProcessPendingDeposits.check_unblocked_transactions_task(
queues, task_interval
),
ProcessPendingDeposits.submit_transaction_task(queues, locks),
)
except asyncio.CancelledError:
logger.debug("caught root task CancelledError...")
@classmethod
async def exit_gracefully(cls, signal_name, frame, root_task): # pragma: no cover
logger.info(f"caught signal {signal_name}, cleaning up before exiting...")
await sync_to_async(
PolarisHeartbeat.objects.filter(
key=PROCESS_PENDING_DEPOSITS_LOCK_KEY
).delete
)()
logger.debug(f"deleted heartbeat key: {PROCESS_PENDING_DEPOSITS_LOCK_KEY}...")
current_task = asyncio.current_task()
tasks = [
task
for task in asyncio.all_tasks()
if task not in [current_task, root_task]
]
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
logger.debug("all tasks have been canceled...")
[docs]class Command(BaseCommand):
"""
This process handles all of the transaction submission logic for deposit transactions.
When this command is invoked, Polaris queries the database for transactions in the
following scenarios and processes them accordingly.
A transaction is in the ``pending_user_transfer_start`` or ``pending_external`` status.
Polaris passes these transaction the
:meth:`~polaris.integrations.RailsIntegration.poll_pending_deposits` integration
function, and the anchor is expected to return :class:`~polaris.models.Transaction`
objects whose funds have been received off-chain. Polaris then checks if each
transaction is in one of the secenarios outlined below, and if not, submits the
return transactions them to the Stellar network. See the
:meth:`~polaris.integrations.RailsIntegration.poll_pending_deposits()` integration
function for more details.
A transaction’s destination account does not have a trustline to the requested asset.
Polaris checks if the trustline has been established. If it has, and the transaction’s
source account doesn’t require multiple signatures, Polaris will submit the transaction
to the Stellar Network.
A transaction’s source account requires multiple signatures before submission to the network.
In this case, :attr:`~polaris.models.Transaction.pending_signatures` is set to ``True``
and the anchor is expected to collect signatures, save the transaction envelope to
:attr:`~polaris.models.Transaction.envelope_xdr`, and set
:attr:`~polaris.models.Transaction.pending_signatures` back to ``False``. Polaris will
then query for these transactions and submit them to the Stellar network.
**Optional arguments:**
-h, --help show this help message and exit
--loop Continually restart command after a specified number
of seconds.
--interval INTERVAL, -i INTERVAL
The number of seconds to wait before restarting
command. Defaults to 10.
"""
def add_arguments(self, parser): # pragma: no cover
parser.add_argument(
"--interval",
"-i",
type=int,
help="The number of seconds to wait before each internal periodic task executes."
"Defaults to {}.".format(1),
)
def handle(self, *_args, **options): # pragma: no cover
"""
The entrypoint for the functionality implemented in this file.
See diagram at polaris/docs/deployment
"""
interval = options.get("interval") or DEFAULT_INTERVAL
ProcessPendingDeposits.acquire_lock(
PROCESS_PENDING_DEPOSITS_LOCK_KEY, DEFAULT_HEARTBEAT
)
asyncio.run(
ProcessPendingDeposits.process_pending_deposits(interval, DEFAULT_HEARTBEAT)
)
logger.info("exiting after cleanup")