, automatic emails). // This cover cases when a welcome or automatic email was scheduled but before processing it the subscriber was deleted. // The non-campaign emails are sent only to a single recipient, and we count stats based on sending tasks statues, so we can't mark them as completed. // At the same time we want to keep a record abut processing them if ($subscriberBatches->count() === 0 && !in_array($newsletter->getType(), NewsletterEntity::CAMPAIGN_TYPES, true)) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'no subscribers to process', ['task_id' => $task->getId()] ); $this->scheduledTasksRepository->invalidateTask($task); return; } /** @var int[] $subscribersToProcessIds - it's required for PHPStan */ foreach ($subscriberBatches as $subscribersToProcessIds) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'subscriber batch processing', ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId(), 'subscriber_batch_count' => count($subscribersToProcessIds)] ); if (!empty($newsletterSegmentsIds[0])) { // Check that subscribers are in segments try { $foundSubscribersIds = $this->subscribersFinder->findSubscribersInSegments($subscribersToProcessIds, $newsletterSegmentsIds, $filterSegmentId); } catch (InvalidStateException $exception) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'paused task in sending queue due to problem finding subscribers: ' . $exception->getMessage(), ['task_id' => $task->getId()] ); $task->setStatus(ScheduledTaskEntity::STATUS_PAUSED); $this->scheduledTasksRepository->flush(); return; } $foundSubscribers = empty($foundSubscribersIds) ? [] : $this->subscribersRepository->findBy(['id' => $foundSubscribersIds, 'deletedAt' => null]); } else { // No segments = Welcome emails or some Automatic emails. // Welcome emails or some Automatic emails use segments only for scheduling and store them as a newsletter option $queryBuilder = $this->entityManager->createQueryBuilder(); $queryBuilder->select('s') ->from(SubscriberEntity::class, 's') ->where('s.id IN (:subscriberIds)') ->setParameter('subscriberIds', $subscribersToProcessIds) ->andWhere('s.deletedAt IS NULL'); if ($newsletter->isTransactional()) { $queryBuilder->andWhere('s.status != :bouncedStatus') ->setParameter('bouncedStatus', SubscriberEntity::STATUS_BOUNCED); } else { $queryBuilder->andWhere('s.status = :subscribedStatus') ->setParameter('subscribedStatus', SubscriberEntity::STATUS_SUBSCRIBED); } $foundSubscribers = $queryBuilder->getQuery()->getResult(); $foundSubscribersIds = array_map(function(SubscriberEntity $subscriber) { return $subscriber->getId(); }, $foundSubscribers); } // if some subscribers weren't found, remove them from the processing list if (count($foundSubscribersIds) !== count($subscribersToProcessIds)) { $subscribersToRemove = array_diff( $subscribersToProcessIds, $foundSubscribersIds ); $this->scheduledTaskSubscribersRepository->deleteByScheduledTaskAndSubscriberIds($task, $subscribersToRemove); $this->sendingQueuesRepository->updateCounts($queue); // if there aren't any subscribers to process in the batch (e.g. all unsubscribed or were deleted), continue with the next batch if (count($foundSubscribersIds) === 0) { continue; } } $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'before queue chunk processing', ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId(), 'found_subscribers_count' => count($foundSubscribers)] ); // reschedule bounce task to run sooner, if needed $this->reScheduleBounceTask(); // Check task has not been paused before continue processing // This is needed because the task can be paused in the middle of the batch processing, // for example on API error ERROR_MESSAGE_BULK_EMAIL_FORBIDDEN if ($task->getStatus() === ScheduledTaskEntity::STATUS_PAUSED) { return; } if ($newsletter->getStatus() !== NewsletterEntity::STATUS_CORRUPT) { $this->processQueue( $task, $newsletter, $foundSubscribers, $timer ); if (!$newsletter->isTransactional()) { $this->entityManager->wrapInTransaction(function() use ($foundSubscribersIds) { $now = Carbon::now()->millisecond(0); $this->subscribersRepository->bulkUpdateLastSendingAt($foundSubscribersIds, $now); // We're nullifying this value so these subscribers' engagement score will be recalculated the next time the cron runs $this->subscribersRepository->bulkUpdateEngagementScoreUpdatedAt($foundSubscribersIds, null); }); } $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'after queue chunk processing', ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()] ); // In case we finished end sending properly before enforcing sending and execution limits // The limit enforcing throws and exception and the sending end wouldn't be processed properly (stats notification, newsletter marked as sent etc.) if ($task->getStatus() === ScheduledTaskEntity::STATUS_COMPLETED) { $this->endSending($task, $newsletter); return; } $this->enforceSendingAndExecutionLimits($timer); } else { $this->sendingQueuesRepository->pause($queue); $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error( 'Can\'t send corrupt newsletter', ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()] ); return; } } // At this point all batches were processed or there are no batches to process // Also none of the checks above paused or invalidated the task $this->endSending($task, $newsletter); } public function getBatchSize(): int { return $this->throttlingHandler->getBatchSize(); } /** * @param SubscriberEntity[] $subscribers */ public function processQueue(ScheduledTaskEntity $task, NewsletterEntity $newsletter, array $subscribers, $timer) { // determine if processing is done in bulk or individually $processingMethod = $this->mailerTask->getProcessingMethod(); $preparedNewsletters = []; $preparedSubscribers = []; $preparedSubscribersIds = []; $unsubscribeUrls = []; $statistics = []; $metas = []; $oneClickUnsubscribeUrls = []; $sendingQueueEntity = $task->getSendingQueue(); if (!$sendingQueueEntity) { return; } $sendingQueueMeta = $sendingQueueEntity->getMeta() ?? []; $campaignId = $sendingQueueMeta['campaignId'] ?? null; foreach ($subscribers as $subscriber) { // render shortcodes and replace subscriber data in tracked links $preparedNewsletters[] = $this->newsletterTask->prepareNewsletterForSending( $newsletter, $subscriber, $sendingQueueEntity ); // format subscriber name/address according to mailer settings $preparedSubscribers[] = $this->mailerTask->prepareSubscriberForSending( $subscriber ); $preparedSubscribersIds[] = $subscriber->getId(); // create personalized instant unsubsribe link $unsubscribeUrls[] = $this->links->getUnsubscribeUrl($sendingQueueEntity->getId(), $subscriber); $oneClickUnsubscribeUrls[] = $this->links->getOneClickUnsubscribeUrl($sendingQueueEntity->getId(), $subscriber); $metasForSubscriber = $this->mailerMetaInfo->getNewsletterMetaInfo($newsletter, $subscriber); if ($campaignId) { $metasForSubscriber['campaign_id'] = $campaignId; } $metas[] = $metasForSubscriber; // keep track of values for statistics purposes $statistics[] = [ 'newsletter_id' => $newsletter->getId(), 'subscriber_id' => $subscriber->getId(), 'queue_id' => $sendingQueueEntity->getId(), ]; if ($processingMethod === 'individual') { $this->sendNewsletter( $task, $preparedSubscribersIds[0], $preparedNewsletters[0], $preparedSubscribers[0], $statistics[0], $timer, [ 'unsubscribe_url' => $unsubscribeUrls[0], 'meta' => $metas[0], 'one_click_unsubscribe' => $oneClickUnsubscribeUrls[0], ] ); $preparedNewsletters = []; $preparedSubscribers = []; $preparedSubscribersIds = []; $unsubscribeUrls = []; $oneClickUnsubscribeUrls = []; $statistics = []; $metas = []; } } if ($processingMethod === 'bulk') { $this->sendNewsletters( $task, $preparedSubscribersIds, $preparedNewsletters, $preparedSubscribers, $statistics, $timer, [ 'unsubscribe_url' => $unsubscribeUrls, 'meta' => $metas, 'one_click_unsubscribe' => $oneClickUnsubscribeUrls, ] ); } } public function sendNewsletter( ScheduledTaskEntity $task, $preparedSubscriberId, $preparedNewsletter, $preparedSubscriber, $statistics, $timer, $extraParams = [] ) { // send newsletter $sendResult = $this->mailerTask->send( $preparedNewsletter, $preparedSubscriber, $extraParams ); $this->processSendResult( $task, $sendResult, [$preparedSubscriber], [$preparedSubscriberId], [$statistics], $timer ); } public function sendNewsletters( ScheduledTaskEntity $task, $preparedSubscribersIds, $preparedNewsletters, $preparedSubscribers, $statistics, $timer, $extraParams = [] ) { // send newsletters $sendResult = $this->mailerTask->sendBulk( $preparedNewsletters, $preparedSubscribers, $extraParams ); $this->processSendResult( $task, $sendResult, $preparedSubscribers, $preparedSubscribersIds, $statistics, $timer ); } /** * Checks whether some of segments was deleted or trashed * @param int[] $segmentIds */ private function checkDeletedSegments(array $segmentIds): bool { if (count($segmentIds) === 0) { return true; } $segmentIds = array_unique($segmentIds); $segments = $this->segmentsRepository->findBy(['id' => $segmentIds]); // Some segment was deleted from DB if (count($segmentIds) > count($segments)) { return false; } foreach ($segments as $segment) { if ($segment->getDeletedAt() !== null) { return false; } } return true; } private function processSendResult( ScheduledTaskEntity $task, $sendResult, array $preparedSubscribers, array $preparedSubscribersIds, array $statistics, $timer ) { // log error message and schedule retry/pause sending if ($sendResult['response'] === false) { $error = $sendResult['error']; $this->errorHandler->processError($error, $task, $preparedSubscribersIds, $preparedSubscribers); } else { $queue = $task->getSendingQueue(); if (!$queue) { return; } try { $this->scheduledTaskSubscribersRepository->updateProcessedSubscribers($task, $preparedSubscribersIds); $this->sendingQueuesRepository->updateCounts($queue); } catch (Throwable $e) { MailerLog::processError( 'processed_list_update', sprintf('QUEUE-%d-PROCESSED-LIST-UPDATE', $queue->getId()), null, true ); } } // log statistics $this->statisticsNewslettersRepository->createMultiple($statistics); // update the sent count $this->mailerTask->updateSentCount(); // enforce execution limits if queue is still being processed if ($task->getStatus() !== ScheduledTaskEntity::STATUS_COMPLETED) { $this->enforceSendingAndExecutionLimits($timer); } // trigger automation email sent hook for automation emails if ( $task->getStatus() === ScheduledTaskEntity::STATUS_COMPLETED && isset($task->getMeta()['automation']) ) { try { $this->wp->doAction('mailpoet_automation_email_sent', $task->getMeta()['automation']); } catch (Throwable $e) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error( 'Error while executing "mailpoet_automation_email_sent action" hook', ['task_id' => $task->getId(), 'error' => $e->getMessage()] ); } } $this->throttlingHandler->processSuccess(); } public function enforceSendingAndExecutionLimits($timer) { // abort if execution limit is reached $this->cronHelper->enforceExecutionLimit($timer); // abort if sending limit has been reached MailerLog::enforceExecutionRequirements(); } private function reScheduleBounceTask() { $bounceTasks = $this->scheduledTasksRepository->findFutureScheduledByType(Bounce::TASK_TYPE); if (count($bounceTasks)) { $bounceTask = reset($bounceTasks); if (Carbon::now()->millisecond(0)->addHours(42)->lessThan($bounceTask->getScheduledAt())) { $randomOffset = rand(-6 * 60 * 60, 6 * 60 * 60); $bounceTask->setScheduledAt(Carbon::now()->millisecond(0)->addSeconds((36 * 60 * 60) + $randomOffset)); $this->scheduledTasksRepository->persist($bounceTask); $this->scheduledTasksRepository->flush(); } } } private function startProgress(ScheduledTaskEntity $task): void { $task->setInProgress(true); $this->scheduledTasksRepository->flush(); } private function stopProgress(ScheduledTaskEntity $task): void { // if task is not managed by entity manager, it's already deleted and detached // it can be deleted in self::processSending method if (!$this->entityManager->contains($task)) { return; } $task->setInProgress(false); $this->scheduledTasksRepository->flush(); } private function isTimeout(ScheduledTaskEntity $task): bool { $currentTime = Carbon::now()->millisecond(0); $updatedAt = new Carbon($task->getUpdatedAt()); if ($updatedAt->diffInSeconds($currentTime, false) > $this->getExecutionLimit()) { return true; } return false; } private function getExecutionLimit(): int { return $this->cronHelper->getDaemonExecutionLimit() * 3; } private function deleteTaskIfNewsletterDoesNotExist(ScheduledTaskEntity $task) { $queue = $task->getSendingQueue(); $newsletter = $queue ? $queue->getNewsletter() : null; if ($newsletter !== null) { return; } $this->deleteTask($task); } private function deleteTask(ScheduledTaskEntity $task) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'delete task in sending queue', ['task_id' => $task->getId()] ); $queue = $task->getSendingQueue(); if ($queue) { $this->sendingQueuesRepository->remove($queue); } $this->scheduledTaskSubscribersRepository->deleteByScheduledTask($task); $this->scheduledTasksRepository->remove($task); $this->scheduledTasksRepository->flush(); } private function endSending(ScheduledTaskEntity $task, NewsletterEntity $newsletter): void { // We should handle all transitions into these states in the processSending method and end processing there or we throw an exception // This might theoretically happen when multiple cron workers are running in parallel which we don't support and try to prevent $unexpectedStates = [ ScheduledTaskEntity::STATUS_PAUSED, ScheduledTaskEntity::STATUS_INVALID, ScheduledTaskEntity::STATUS_SCHEDULED, ]; if (in_array($task->getStatus(), $unexpectedStates)) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error( 'Sending task reached end of processing in sending queue worker in an unexpected state.', ['task_id' => $task->getId(), 'status' => $task->getStatus()] ); return; } // The task is running but there is no one to send to. // This may happen when we send to all but the execution is interrupted (e.g. by PHP time limit) and we don't update the task status // or if we trigger sending to a newsletter without any subscriber (e.g. scheduled for long time but all were deleted) // Lets set status to completed and update the queue counts if ($task->getStatus() === null && $this->scheduledTaskSubscribersRepository->countUnprocessed($task) === 0) { $task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED); $queue = $task->getSendingQueue(); if ($queue) { $this->sendingQueuesRepository->updateCounts($queue); } $this->scheduledTasksRepository->flush(); } // Task is completed let's do all the stuff for the completed task if ($task->getStatus() === ScheduledTaskEntity::STATUS_COMPLETED) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'completed newsletter sending', ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()] ); $this->newsletterTask->markNewsletterAsSent($newsletter); $this->statsNotificationsScheduler->schedule($newsletter); } } }