From db8a816524fc65a42d0d8c0f14d66bb55f22a235 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Tue, 26 Jul 2022 10:17:50 +0200 Subject: [PATCH] Implemented redis pub/sub listeners --- composer.json | 2 +- config/autoload/redis.global.php | 9 ++- .../Core/config/event_dispatcher.config.php | 18 ++++- .../RabbitMq/NotifyVisitToRabbitMq.php | 6 +- .../RedisPubSub/NotifyNewShortUrlToRedis.php | 40 ++++++++++- .../RedisPubSub/NotifyVisitToRedis.php | 70 ++++++++++++++++++- 6 files changed, 135 insertions(+), 10 deletions(-) diff --git a/composer.json b/composer.json index a896040d..197db58b 100644 --- a/composer.json +++ b/composer.json @@ -36,7 +36,7 @@ "mezzio/mezzio": "^3.7", "mezzio/mezzio-fastroute": "^3.3", "mezzio/mezzio-problem-details": "^1.5", - "mezzio/mezzio-swoole": "^4.0", + "mezzio/mezzio-swoole": "^4.3", "mlocati/ip-lib": "^1.17", "ocramius/proxy-manager": "^2.11", "pagerfanta/core": "^3.5", diff --git a/config/autoload/redis.global.php b/config/autoload/redis.global.php index a01d0279..1d035055 100644 --- a/config/autoload/redis.global.php +++ b/config/autoload/redis.global.php @@ -6,17 +6,22 @@ use Shlinkio\Shlink\Core\Config\EnvVars; return (static function (): array { $redisServers = EnvVars::REDIS_SERVERS->loadFromEnv(); + $pubSub = [ + 'redis' => [ + 'pub_sub_enabled' => $redisServers !== null && EnvVars::REDIS_PUB_SUB_ENABLED->loadFromEnv(false), + ], + ]; return match ($redisServers) { - null => [], + null => $pubSub, default => [ 'cache' => [ 'redis' => [ 'servers' => $redisServers, 'sentinel_service' => EnvVars::REDIS_SENTINEL_SERVICE->loadFromEnv(), - 'pub_sub_enabled' => (bool) EnvVars::REDIS_PUB_SUB_ENABLED->loadFromEnv(false), ], ], + ...$pubSub, ], }; })(); diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index 7ca2e072..908c8183 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -7,6 +7,7 @@ namespace Shlinkio\Shlink\Core; use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory; use Psr\EventDispatcher\EventDispatcherInterface; use Shlinkio\Shlink\CLI\Util\GeolocationDbUpdater; +use Shlinkio\Shlink\Common\Cache\RedisPublishingHelper; use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelper; use Shlinkio\Shlink\IpGeolocation\GeoLite2\DbUpdater; use Shlinkio\Shlink\IpGeolocation\Resolver\IpLocationResolverInterface; @@ -117,8 +118,21 @@ return [ ShortUrl\Transformer\ShortUrlDataTransformer::class, Options\RabbitMqOptions::class, ], - EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [], - EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [], + EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [ + RedisPublishingHelper::class, + 'em', + 'Logger_Shlink', + Visit\Transformer\OrphanVisitDataTransformer::class, + ShortUrl\Transformer\ShortUrlDataTransformer::class, + 'config.redis.pub_sub_enabled', + ], + EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [ + RedisPublishingHelper::class, + 'em', + 'Logger_Shlink', + ShortUrl\Transformer\ShortUrlDataTransformer::class, + 'config.redis.pub_sub_enabled', + ], EventDispatcher\UpdateGeoLiteDb::class => [GeolocationDbUpdater::class, 'Logger_Shlink'], ], diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php index 1208c291..b3e4e8c1 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php @@ -14,6 +14,8 @@ use Shlinkio\Shlink\Core\EventDispatcher\Topic; use Shlinkio\Shlink\Core\Options\RabbitMqOptions; use Throwable; +use function Functional\each; + class NotifyVisitToRabbitMq { public function __construct( @@ -46,9 +48,7 @@ class NotifyVisitToRabbitMq $payload = $this->visitToPayload($visit); try { - foreach ($queues as $queue) { - $this->rabbitMqHelper->publishPayloadInQueue($payload, $queue); - } + each($queues, fn (string $queue) => $this->rabbitMqHelper->publishPayloadInQueue($payload, $queue)); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]); } diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php index 6813fba3..b97f0047 100644 --- a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php @@ -4,12 +4,50 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub; +use Doctrine\ORM\EntityManagerInterface; +use Psr\Log\LoggerInterface; +use Shlinkio\Shlink\Common\Cache\RedisPublishingHelperInterface; +use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; +use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; +use Shlinkio\Shlink\Core\EventDispatcher\Topic; +use Throwable; class NotifyNewShortUrlToRedis { + public function __construct( + private readonly RedisPublishingHelperInterface $redisHelper, + private readonly EntityManagerInterface $em, + private readonly LoggerInterface $logger, + private readonly DataTransformerInterface $shortUrlTransformer, + private readonly bool $enabled, + ) { + } + public function __invoke(ShortUrlCreated $shortUrlCreated): void { - // TODO: Implement __invoke() method. + if (! $this->enabled) { + return; + } + + $shortUrlId = $shortUrlCreated->shortUrlId; + $shortUrl = $this->em->find(ShortUrl::class, $shortUrlId); + + if ($shortUrl === null) { + $this->logger->warning( + 'Tried to notify Redis pub/sub for new short URL with id "{shortUrlId}", but it does not exist.', + ['shortUrlId' => $shortUrlId], + ); + return; + } + + try { + $this->redisHelper->publishPayloadInQueue( + ['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)], + Topic::NEW_SHORT_URL->value, + ); + } catch (Throwable $e) { + $this->logger->debug('Error while trying to notify Redis pub/sub with new short URL. {e}', ['e' => $e]); + } } } diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php index e7f87aaf..0677c1ed 100644 --- a/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php @@ -4,12 +4,80 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub; +use Doctrine\ORM\EntityManagerInterface; +use Psr\Log\LoggerInterface; +use Shlinkio\Shlink\Common\Cache\RedisPublishingHelperInterface; +use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; +use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; +use Shlinkio\Shlink\Core\EventDispatcher\Topic; +use Throwable; + +use function Functional\each; class NotifyVisitToRedis { + public function __construct( + private readonly RedisPublishingHelperInterface $redisHelper, + private readonly EntityManagerInterface $em, + private readonly LoggerInterface $logger, + private readonly DataTransformerInterface $orphanVisitTransformer, + private readonly DataTransformerInterface $shortUrlTransformer, + private readonly bool $enabled, + ) { + } + public function __invoke(VisitLocated $visitLocated): void { - // TODO: Implement __invoke() method. + if (! $this->enabled) { + return; + } + + $visitId = $visitLocated->visitId; + $visit = $this->em->find(Visit::class, $visitId); + + if ($visit === null) { + $this->logger->warning( + 'Tried to notify Redis pub/sub for visit with id "{visitId}", but it does not exist.', + ['visitId' => $visitId], + ); + return; + } + + $queues = $this->determineQueuesToPublishTo($visit); + $payload = $this->visitToPayload($visit); + + try { + each($queues, fn (string $queue) => $this->redisHelper->publishPayloadInQueue($payload, $queue)); + } catch (Throwable $e) { + $this->logger->debug('Error while trying to notify Redis pub/sub with new visit. {e}', ['e' => $e]); + } + } + + /** + * @return string[] + */ + private function determineQueuesToPublishTo(Visit $visit): array + { + if ($visit->isOrphan()) { + return [Topic::NEW_ORPHAN_VISIT->value]; + } + + return [ + Topic::NEW_VISIT->value, + Topic::newShortUrlVisit($visit->getShortUrl()?->getShortCode()), + ]; + } + + private function visitToPayload(Visit $visit): array + { + if ($visit->isOrphan()) { + return ['visit' => $this->orphanVisitTransformer->transform($visit)]; + } + + return [ + 'visit' => $visit->jsonSerialize(), + 'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()), + ]; } }