Source code for polaris.management.commands.execute_outgoing_transactions

import sys
import signal
import time
from decimal import Decimal
from datetime import datetime, timezone

import django.db.transaction
from django.db.models import Q
from django.core.management import BaseCommand

from polaris import settings
from polaris.integrations import registered_fee_func, calculate_fee
from polaris.utils import getLogger, maybe_make_callback
from polaris.models import Transaction
from polaris.integrations import registered_rails_integration as rri


logger = getLogger(__name__)
DEFAULT_INTERVAL = 30
TERMINATE = False


[docs]class Command(BaseCommand): """ This process periodically queries for transactions that are ready to be executed off-chain and calls Polaris’ :meth:`~polaris.integrations.RailsIntegration.execute_outgoing_transaction` integration function for each one. Ready transactions are those in ``pending_receiver`` or ``pending_anchor`` statuses, among other conditions. Anchors are expected to update the :attr:`~polaris.models.Transaction.status` to ``completed`` or ``pending_external`` if initiating the transfer was successful. **Optional arguments:** -h, --help show this help message and exit --loop Continually restart command after a specified number of seconds. --interval INTERVAL, -i INTERVAL The number of seconds to wait before restarting command. Defaults to 30. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) signal.signal(signal.SIGINT, self.exit_gracefully) signal.signal(signal.SIGTERM, self.exit_gracefully) @staticmethod def exit_gracefully(sig, frame): # pragma: no cover logger.info("Exiting execute_outgoing_transactions...") module = sys.modules[__name__] module.TERMINATE = True @staticmethod def sleep(seconds): # pragma: no cover module = sys.modules[__name__] for _ in range(seconds): if module.TERMINATE: break time.sleep(1) def add_arguments(self, parser): # pragma: no cover parser.add_argument( "--loop", action="store_true", help="Continually restart command after a specified number of seconds.", ) parser.add_argument( "--interval", "-i", type=int, help=( "The number of seconds to wait before restarting command. " "Defaults to {}.".format(DEFAULT_INTERVAL) ), ) def handle(self, *_args, **options): # pragma: no cover module = sys.modules[__name__] if options.get("loop"): while True: if module.TERMINATE: break self.execute_outgoing_transactions() self.sleep(options.get("interval") or DEFAULT_INTERVAL) else: self.execute_outgoing_transactions() @staticmethod def execute_outgoing_transactions(): """ Execute pending withdrawals. """ module = sys.modules[__name__] sep31_qparams = Q( protocol=Transaction.PROTOCOL.sep31, status=Transaction.STATUS.pending_receiver, kind=Transaction.KIND.send, ) sep6_24_qparams = Q( protocol__in=[Transaction.PROTOCOL.sep24, Transaction.PROTOCOL.sep6], status=Transaction.STATUS.pending_anchor, kind__in=[ Transaction.KIND.withdrawal, getattr(Transaction.KIND, "withdrawal-exchange"), ], ) with django.db.transaction.atomic(): transactions = list( Transaction.objects.filter( sep6_24_qparams | sep31_qparams, pending_execution_attempt=False ).select_for_update() ) ids = [] for t in transactions: t.pending_execution_attempt = True ids.append(t.id) Transaction.objects.filter(id__in=ids).update( pending_execution_attempt=True ) if transactions: logger.info(f"Executing {len(transactions)} outgoing transactions") num_completed = 0 for i, transaction in enumerate(transactions): if module.TERMINATE: still_processing_transactions = transactions[i:] Transaction.objects.filter( id__in=[t.id for t in still_processing_transactions] ).update(pending_execution_attempt=False) break logger.info(f"Calling execute_outgoing_transaction() for {transaction.id}") try: rri.execute_outgoing_transaction(transaction) except NotImplementedError: logger.error( "RailsIntegration.execute_outgoing_transaction() is not implemented" ) module.TERMINATE = True break except Exception: transaction.pending_execution_attempt = False transaction.save() logger.exception( "execute_outgoing_transactions() threw an unexpected exception" ) continue transaction.refresh_from_db() if ( transaction.protocol == Transaction.PROTOCOL.sep31 and transaction.status == Transaction.STATUS.pending_receiver ) or ( transaction.protocol in [Transaction.PROTOCOL.sep24, Transaction.PROTOCOL.sep6] and transaction.status == transaction.STATUS.pending_anchor ): transaction.pending_execution_attempt = False if transaction.quote: transaction.quote.save() transaction.save() logger.error( f"Transaction {transaction.id} status must be " f"updated after call to execute_outgoing_transaction()" ) continue elif transaction.status in [ Transaction.STATUS.pending_external, Transaction.STATUS.completed, ]: if transaction.amount_fee is None or transaction.amount_out is None: if transaction.quote: err_msg = ( f"transaction {transaction.id} uses a quote but was returned " "from execute_outgoing_transaction() without amount_fee or amount_out " "assigned, skipping" ) logger.error(err_msg) transaction.message = err_msg transaction.pending_execution_attempt = False transaction.quote.save() transaction.save() continue logger.warning( f"transaction {transaction.id} was returned from execute_outgoing_transaction() " "without Transaction.amount_fee or Transaction.amount_out assigned. Future Polaris " "releases will not calculate fees and delivered amounts." ) if transaction.amount_fee is None: if not transaction.quote and registered_fee_func is calculate_fee: op = { Transaction.KIND.withdrawal: settings.OPERATION_WITHDRAWAL, getattr( Transaction.KIND, "withdrawal-exchange" ): settings.OPERATION_WITHDRAWAL, Transaction.KIND.send: settings.OPERATION_SEND, }[transaction.kind] try: transaction.amount_fee = calculate_fee( { "amount": transaction.amount_in, "operation": op, "asset_code": transaction.asset.code, } ) except ValueError: transaction.pending_execution_attempt = False transaction.save() logger.exception("Unable to calculate fee") continue else: transaction.amount_fee = Decimal(0) if not transaction.quote: transaction.amount_out = round( transaction.amount_in - transaction.amount_fee, transaction.asset.significant_decimals, ) # Anchors can mark transactions as pending_external if the transfer # cannot be completed immediately due to external processing. # poll_outgoing_transactions will check on these transfers and mark them # as complete when the funds have been received by the user. if transaction.status == Transaction.STATUS.completed: num_completed += 1 transaction.completed_at = datetime.now(timezone.utc) elif transaction.status not in [ Transaction.STATUS.error, Transaction.STATUS.pending_transaction_info_update, Transaction.STATUS.pending_customer_info_update, ]: transaction.pending_execution_attempt = False if transaction.quote: transaction.save() transaction.save() logger.error( f"Transaction {transaction.id} was moved to invalid status" f" {transaction.status}" ) continue transaction.pending_execution_attempt = False if transaction.quote: transaction.quote.save() transaction.save() maybe_make_callback(transaction) if num_completed: logger.info(f"{num_completed} transfers have been completed")