From eff50ca202bc232cff0eb78f57aa272f25c669b5 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Mon, 25 Jul 2022 18:23:13 +0200 Subject: [PATCH 01/19] Created new event listeners to send events to redis pub/sub --- composer.json | 5 ++--- config/autoload/installer.global.php | 1 + config/autoload/locks.global.php | 4 ++-- config/autoload/redis.global.php | 1 + module/Core/config/event_dispatcher.config.php | 12 ++++++++++++ module/Core/src/Config/EnvVars.php | 1 + .../Mercure/NotifyVisitToMercure.php | 4 ++-- .../RabbitMq/NotifyVisitToRabbitMq.php | 4 ++-- .../RedisPubSub/NotifyNewShortUrlToRedis.php | 15 +++++++++++++++ .../RedisPubSub/NotifyVisitToRedis.php | 15 +++++++++++++++ 10 files changed, 53 insertions(+), 9 deletions(-) create mode 100644 module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php create mode 100644 module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php diff --git a/composer.json b/composer.json index 533add72..a896040d 100644 --- a/composer.json +++ b/composer.json @@ -41,14 +41,13 @@ "ocramius/proxy-manager": "^2.11", "pagerfanta/core": "^3.5", "php-middleware/request-id": "^4.1", - "predis/predis": "^1.1", "pugx/shortid-php": "^1.0", "ramsey/uuid": "^4.2", - "shlinkio/shlink-common": "dev-main#0396706 as 4.5", + "shlinkio/shlink-common": "dev-main#4019020 as 4.5", "shlinkio/shlink-config": "^1.6", "shlinkio/shlink-event-dispatcher": "^2.4", "shlinkio/shlink-importer": "^3.0", - "shlinkio/shlink-installer": "^7.1", + "shlinkio/shlink-installer": "dev-develop#f76e9aa as 7.2", "shlinkio/shlink-ip-geolocation": "^2.2", "symfony/console": "^6.0", "symfony/filesystem": "^6.0", diff --git a/config/autoload/installer.global.php b/config/autoload/installer.global.php index 3cada5db..c82b4a97 100644 --- a/config/autoload/installer.global.php +++ b/config/autoload/installer.global.php @@ -32,6 +32,7 @@ return [ Option\Worker\WebWorkerNumConfigOption::class, Option\Redis\RedisServersConfigOption::class, Option\Redis\RedisSentinelServiceConfigOption::class, + Option\Redis\RedisPubSubConfigOption::class, Option\UrlShortener\ShortCodeLengthOption::class, Option\Mercure\EnableMercureConfigOption::class, Option\Mercure\MercurePublicUrlConfigOption::class, diff --git a/config/autoload/locks.global.php b/config/autoload/locks.global.php index 9b014496..5e37e770 100644 --- a/config/autoload/locks.global.php +++ b/config/autoload/locks.global.php @@ -3,7 +3,7 @@ declare(strict_types=1); use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory; -use Predis\ClientInterface as PredisClient; +use Shlinkio\Shlink\Common\Cache\RedisFactory; use Shlinkio\Shlink\Common\Logger\LoggerAwareDelegatorFactory; use Shlinkio\Shlink\Core\Config\EnvVars; use Symfony\Component\Lock; @@ -38,7 +38,7 @@ return [ ConfigAbstractFactory::class => [ Lock\Store\FlockStore::class => ['config.locks.locks_dir'], - Lock\Store\RedisStore::class => [PredisClient::class], + Lock\Store\RedisStore::class => [RedisFactory::SERVICE_NAME], Lock\LockFactory::class => ['lock_store'], LOCAL_LOCK_FACTORY => ['local_lock_store'], ], diff --git a/config/autoload/redis.global.php b/config/autoload/redis.global.php index 0133d1b1..a01d0279 100644 --- a/config/autoload/redis.global.php +++ b/config/autoload/redis.global.php @@ -14,6 +14,7 @@ return (static function (): array { 'redis' => [ 'servers' => $redisServers, 'sentinel_service' => EnvVars::REDIS_SENTINEL_SERVICE->loadFromEnv(), + 'pub_sub_enabled' => (bool) EnvVars::REDIS_PUB_SUB_ENABLED->loadFromEnv(false), ], ], ], diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index 96907a5d..7ca2e072 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -24,12 +24,14 @@ return [ EventDispatcher\Event\VisitLocated::class => [ EventDispatcher\Mercure\NotifyVisitToMercure::class, EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class, + EventDispatcher\RedisPubSub\NotifyVisitToRedis::class, EventDispatcher\NotifyVisitToWebHooks::class, EventDispatcher\UpdateGeoLiteDb::class, ], EventDispatcher\Event\ShortUrlCreated::class => [ EventDispatcher\Mercure\NotifyNewShortUrlToMercure::class, EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class, + EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class, ], ], ], @@ -42,6 +44,8 @@ return [ EventDispatcher\Mercure\NotifyNewShortUrlToMercure::class => ConfigAbstractFactory::class, EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class => ConfigAbstractFactory::class, EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => ConfigAbstractFactory::class, + EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => ConfigAbstractFactory::class, + EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => ConfigAbstractFactory::class, EventDispatcher\UpdateGeoLiteDb::class => ConfigAbstractFactory::class, ], @@ -58,6 +62,12 @@ return [ EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => [ EventDispatcher\CloseDbConnectionEventListenerDelegator::class, ], + EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [ + EventDispatcher\CloseDbConnectionEventListenerDelegator::class, + ], + EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [ + EventDispatcher\CloseDbConnectionEventListenerDelegator::class, + ], EventDispatcher\NotifyVisitToWebHooks::class => [ EventDispatcher\CloseDbConnectionEventListenerDelegator::class, ], @@ -107,6 +117,8 @@ return [ ShortUrl\Transformer\ShortUrlDataTransformer::class, Options\RabbitMqOptions::class, ], + EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [], + EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [], EventDispatcher\UpdateGeoLiteDb::class => [GeolocationDbUpdater::class, 'Logger_Shlink'], ], diff --git a/module/Core/src/Config/EnvVars.php b/module/Core/src/Config/EnvVars.php index 33abae01..a68f24f3 100644 --- a/module/Core/src/Config/EnvVars.php +++ b/module/Core/src/Config/EnvVars.php @@ -19,6 +19,7 @@ enum EnvVars: string case GEOLITE_LICENSE_KEY = 'GEOLITE_LICENSE_KEY'; case REDIS_SERVERS = 'REDIS_SERVERS'; case REDIS_SENTINEL_SERVICE = 'REDIS_SENTINEL_SERVICE'; + case REDIS_PUB_SUB_ENABLED = 'REDIS_PUB_SUB_ENABLED'; case MERCURE_PUBLIC_HUB_URL = 'MERCURE_PUBLIC_HUB_URL'; case MERCURE_INTERNAL_HUB_URL = 'MERCURE_INTERNAL_HUB_URL'; case MERCURE_JWT_SECRET = 'MERCURE_JWT_SECRET'; diff --git a/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php b/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php index 11d3a8e4..f9610021 100644 --- a/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php +++ b/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php @@ -25,9 +25,9 @@ class NotifyVisitToMercure ) { } - public function __invoke(VisitLocated $shortUrlLocated): void + public function __invoke(VisitLocated $visitLocated): void { - $visitId = $shortUrlLocated->visitId; + $visitId = $visitLocated->visitId; /** @var Visit|null $visit */ $visit = $this->em->find(Visit::class, $visitId); diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php index 34a951ec..1208c291 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php @@ -26,13 +26,13 @@ class NotifyVisitToRabbitMq ) { } - public function __invoke(VisitLocated $shortUrlLocated): void + public function __invoke(VisitLocated $visitLocated): void { if (! $this->options->isEnabled()) { return; } - $visitId = $shortUrlLocated->visitId; + $visitId = $visitLocated->visitId; $visit = $this->em->find(Visit::class, $visitId); if ($visit === null) { diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php new file mode 100644 index 00000000..6813fba3 --- /dev/null +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php @@ -0,0 +1,15 @@ + Date: Tue, 26 Jul 2022 10:17:50 +0200 Subject: [PATCH 02/19] Implemented redis pub/sub listeners --- composer.json | 2 +- config/autoload/redis.global.php | 9 ++- .../Core/config/event_dispatcher.config.php | 18 ++++- .../RabbitMq/NotifyVisitToRabbitMq.php | 6 +- .../RedisPubSub/NotifyNewShortUrlToRedis.php | 40 ++++++++++- .../RedisPubSub/NotifyVisitToRedis.php | 70 ++++++++++++++++++- 6 files changed, 135 insertions(+), 10 deletions(-) diff --git a/composer.json b/composer.json index a896040d..197db58b 100644 --- a/composer.json +++ b/composer.json @@ -36,7 +36,7 @@ "mezzio/mezzio": "^3.7", "mezzio/mezzio-fastroute": "^3.3", "mezzio/mezzio-problem-details": "^1.5", - "mezzio/mezzio-swoole": "^4.0", + "mezzio/mezzio-swoole": "^4.3", "mlocati/ip-lib": "^1.17", "ocramius/proxy-manager": "^2.11", "pagerfanta/core": "^3.5", diff --git a/config/autoload/redis.global.php b/config/autoload/redis.global.php index a01d0279..1d035055 100644 --- a/config/autoload/redis.global.php +++ b/config/autoload/redis.global.php @@ -6,17 +6,22 @@ use Shlinkio\Shlink\Core\Config\EnvVars; return (static function (): array { $redisServers = EnvVars::REDIS_SERVERS->loadFromEnv(); + $pubSub = [ + 'redis' => [ + 'pub_sub_enabled' => $redisServers !== null && EnvVars::REDIS_PUB_SUB_ENABLED->loadFromEnv(false), + ], + ]; return match ($redisServers) { - null => [], + null => $pubSub, default => [ 'cache' => [ 'redis' => [ 'servers' => $redisServers, 'sentinel_service' => EnvVars::REDIS_SENTINEL_SERVICE->loadFromEnv(), - 'pub_sub_enabled' => (bool) EnvVars::REDIS_PUB_SUB_ENABLED->loadFromEnv(false), ], ], + ...$pubSub, ], }; })(); diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index 7ca2e072..908c8183 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -7,6 +7,7 @@ namespace Shlinkio\Shlink\Core; use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory; use Psr\EventDispatcher\EventDispatcherInterface; use Shlinkio\Shlink\CLI\Util\GeolocationDbUpdater; +use Shlinkio\Shlink\Common\Cache\RedisPublishingHelper; use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelper; use Shlinkio\Shlink\IpGeolocation\GeoLite2\DbUpdater; use Shlinkio\Shlink\IpGeolocation\Resolver\IpLocationResolverInterface; @@ -117,8 +118,21 @@ return [ ShortUrl\Transformer\ShortUrlDataTransformer::class, Options\RabbitMqOptions::class, ], - EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [], - EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [], + EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [ + RedisPublishingHelper::class, + 'em', + 'Logger_Shlink', + Visit\Transformer\OrphanVisitDataTransformer::class, + ShortUrl\Transformer\ShortUrlDataTransformer::class, + 'config.redis.pub_sub_enabled', + ], + EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [ + RedisPublishingHelper::class, + 'em', + 'Logger_Shlink', + ShortUrl\Transformer\ShortUrlDataTransformer::class, + 'config.redis.pub_sub_enabled', + ], EventDispatcher\UpdateGeoLiteDb::class => [GeolocationDbUpdater::class, 'Logger_Shlink'], ], diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php index 1208c291..b3e4e8c1 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php @@ -14,6 +14,8 @@ use Shlinkio\Shlink\Core\EventDispatcher\Topic; use Shlinkio\Shlink\Core\Options\RabbitMqOptions; use Throwable; +use function Functional\each; + class NotifyVisitToRabbitMq { public function __construct( @@ -46,9 +48,7 @@ class NotifyVisitToRabbitMq $payload = $this->visitToPayload($visit); try { - foreach ($queues as $queue) { - $this->rabbitMqHelper->publishPayloadInQueue($payload, $queue); - } + each($queues, fn (string $queue) => $this->rabbitMqHelper->publishPayloadInQueue($payload, $queue)); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]); } diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php index 6813fba3..b97f0047 100644 --- a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php @@ -4,12 +4,50 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub; +use Doctrine\ORM\EntityManagerInterface; +use Psr\Log\LoggerInterface; +use Shlinkio\Shlink\Common\Cache\RedisPublishingHelperInterface; +use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; +use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; +use Shlinkio\Shlink\Core\EventDispatcher\Topic; +use Throwable; class NotifyNewShortUrlToRedis { + public function __construct( + private readonly RedisPublishingHelperInterface $redisHelper, + private readonly EntityManagerInterface $em, + private readonly LoggerInterface $logger, + private readonly DataTransformerInterface $shortUrlTransformer, + private readonly bool $enabled, + ) { + } + public function __invoke(ShortUrlCreated $shortUrlCreated): void { - // TODO: Implement __invoke() method. + if (! $this->enabled) { + return; + } + + $shortUrlId = $shortUrlCreated->shortUrlId; + $shortUrl = $this->em->find(ShortUrl::class, $shortUrlId); + + if ($shortUrl === null) { + $this->logger->warning( + 'Tried to notify Redis pub/sub for new short URL with id "{shortUrlId}", but it does not exist.', + ['shortUrlId' => $shortUrlId], + ); + return; + } + + try { + $this->redisHelper->publishPayloadInQueue( + ['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)], + Topic::NEW_SHORT_URL->value, + ); + } catch (Throwable $e) { + $this->logger->debug('Error while trying to notify Redis pub/sub with new short URL. {e}', ['e' => $e]); + } } } diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php index e7f87aaf..0677c1ed 100644 --- a/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php @@ -4,12 +4,80 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub; +use Doctrine\ORM\EntityManagerInterface; +use Psr\Log\LoggerInterface; +use Shlinkio\Shlink\Common\Cache\RedisPublishingHelperInterface; +use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; +use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; +use Shlinkio\Shlink\Core\EventDispatcher\Topic; +use Throwable; + +use function Functional\each; class NotifyVisitToRedis { + public function __construct( + private readonly RedisPublishingHelperInterface $redisHelper, + private readonly EntityManagerInterface $em, + private readonly LoggerInterface $logger, + private readonly DataTransformerInterface $orphanVisitTransformer, + private readonly DataTransformerInterface $shortUrlTransformer, + private readonly bool $enabled, + ) { + } + public function __invoke(VisitLocated $visitLocated): void { - // TODO: Implement __invoke() method. + if (! $this->enabled) { + return; + } + + $visitId = $visitLocated->visitId; + $visit = $this->em->find(Visit::class, $visitId); + + if ($visit === null) { + $this->logger->warning( + 'Tried to notify Redis pub/sub for visit with id "{visitId}", but it does not exist.', + ['visitId' => $visitId], + ); + return; + } + + $queues = $this->determineQueuesToPublishTo($visit); + $payload = $this->visitToPayload($visit); + + try { + each($queues, fn (string $queue) => $this->redisHelper->publishPayloadInQueue($payload, $queue)); + } catch (Throwable $e) { + $this->logger->debug('Error while trying to notify Redis pub/sub with new visit. {e}', ['e' => $e]); + } + } + + /** + * @return string[] + */ + private function determineQueuesToPublishTo(Visit $visit): array + { + if ($visit->isOrphan()) { + return [Topic::NEW_ORPHAN_VISIT->value]; + } + + return [ + Topic::NEW_VISIT->value, + Topic::newShortUrlVisit($visit->getShortUrl()?->getShortCode()), + ]; + } + + private function visitToPayload(Visit $visit): array + { + if ($visit->isOrphan()) { + return ['visit' => $this->orphanVisitTransformer->transform($visit)]; + } + + return [ + 'visit' => $visit->jsonSerialize(), + 'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()), + ]; } } From 233bb603cfc7cf9a084aaf5dd08c543ff7ba2890 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Tue, 26 Jul 2022 10:25:16 +0200 Subject: [PATCH 03/19] Updated local redis config --- config/autoload/redis.local.php.local | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/config/autoload/redis.local.php.local b/config/autoload/redis.local.php.local index 08dbae32..9bd8fea6 100644 --- a/config/autoload/redis.local.php.local +++ b/config/autoload/redis.local.php.local @@ -7,12 +7,13 @@ return [ 'cache' => [ 'redis' => [ 'servers' => 'tcp://shlink_redis:6379', -// 'servers' => [ -// 'tcp://shlink_redis:6379', -// ], ], ], + 'redis' => [ + 'pub_sub_enabled' => true, + ], + 'dependencies' => [ 'aliases' => [ // With this config, a user could alias 'lock_store' => 'redis_lock_store' to override the default From 791d6b7e5773ba3594da75de39c3fbc2dd753560 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Tue, 26 Jul 2022 12:07:27 +0200 Subject: [PATCH 04/19] Updated to latest common, with unified publishing API --- composer.json | 15 ++++---- .../RabbitMq/NotifyNewShortUrlToRabbitMq.php | 11 +++--- .../RabbitMq/NotifyVisitToRabbitMq.php | 9 +++-- .../RedisPubSub/NotifyNewShortUrlToRedis.php | 11 +++--- .../RedisPubSub/NotifyVisitToRedis.php | 9 +++-- .../NotifyNewShortUrlToRabbitMqTest.php | 16 ++++----- .../RabbitMq/NotifyVisitToRabbitMqTest.php | 34 +++++++++++-------- 7 files changed, 58 insertions(+), 47 deletions(-) diff --git a/composer.json b/composer.json index 197db58b..9add38d7 100644 --- a/composer.json +++ b/composer.json @@ -43,18 +43,17 @@ "php-middleware/request-id": "^4.1", "pugx/shortid-php": "^1.0", "ramsey/uuid": "^4.2", - "shlinkio/shlink-common": "dev-main#4019020 as 4.5", + "shlinkio/shlink-common": "dev-main#b3848ad as 4.5", "shlinkio/shlink-config": "^1.6", "shlinkio/shlink-event-dispatcher": "^2.4", "shlinkio/shlink-importer": "^3.0", "shlinkio/shlink-installer": "dev-develop#f76e9aa as 7.2", "shlinkio/shlink-ip-geolocation": "^2.2", - "symfony/console": "^6.0", - "symfony/filesystem": "^6.0", - "symfony/lock": "^6.0", - "symfony/mercure": "^0.6", - "symfony/process": "^6.0", - "symfony/string": "^6.0" + "symfony/console": "^6.1", + "symfony/filesystem": "^6.1", + "symfony/lock": "^6.1", + "symfony/process": "^6.1", + "symfony/string": "^6.1" }, "require-dev": { "cebe/php-openapi": "^1.7", @@ -71,7 +70,7 @@ "roave/security-advisories": "dev-master", "shlinkio/php-coding-standard": "~2.3.0", "shlinkio/shlink-test-utils": "^3.0.1", - "symfony/var-dumper": "^6.0", + "symfony/var-dumper": "^6.1", "veewee/composer-run-parallel": "^1.1" }, "autoload": { diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php index 22a7582d..73ed84ca 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php @@ -6,8 +6,9 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; -use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface; use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; use Shlinkio\Shlink\Core\EventDispatcher\Topic; @@ -17,7 +18,7 @@ use Throwable; class NotifyNewShortUrlToRabbitMq { public function __construct( - private readonly RabbitMqPublishingHelperInterface $rabbitMqHelper, + private readonly PublishingHelperInterface $rabbitMqHelper, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, private readonly DataTransformerInterface $shortUrlTransformer, @@ -43,10 +44,10 @@ class NotifyNewShortUrlToRabbitMq } try { - $this->rabbitMqHelper->publishPayloadInQueue( - ['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)], + $this->rabbitMqHelper->publishUpdate(Update::forTopicAndPayload( Topic::NEW_SHORT_URL->value, - ); + ['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)], + )); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify RabbitMQ with new short URL. {e}', ['e' => $e]); } diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php index b3e4e8c1..8f8567db 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php @@ -6,8 +6,9 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; -use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface; use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; use Shlinkio\Shlink\Core\EventDispatcher\Topic; @@ -19,7 +20,7 @@ use function Functional\each; class NotifyVisitToRabbitMq { public function __construct( - private readonly RabbitMqPublishingHelperInterface $rabbitMqHelper, + private readonly PublishingHelperInterface $rabbitMqHelper, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, private readonly DataTransformerInterface $orphanVisitTransformer, @@ -48,7 +49,9 @@ class NotifyVisitToRabbitMq $payload = $this->visitToPayload($visit); try { - each($queues, fn (string $queue) => $this->rabbitMqHelper->publishPayloadInQueue($payload, $queue)); + each($queues, fn (string $queue) => $this->rabbitMqHelper->publishUpdate( + Update::forTopicAndPayload($queue, $payload), + )); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]); } diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php index b97f0047..638aa88a 100644 --- a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php @@ -6,8 +6,9 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; -use Shlinkio\Shlink\Common\Cache\RedisPublishingHelperInterface; use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; use Shlinkio\Shlink\Core\EventDispatcher\Topic; @@ -16,7 +17,7 @@ use Throwable; class NotifyNewShortUrlToRedis { public function __construct( - private readonly RedisPublishingHelperInterface $redisHelper, + private readonly PublishingHelperInterface $redisHelper, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, private readonly DataTransformerInterface $shortUrlTransformer, @@ -42,10 +43,10 @@ class NotifyNewShortUrlToRedis } try { - $this->redisHelper->publishPayloadInQueue( - ['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)], + $this->redisHelper->publishUpdate(Update::forTopicAndPayload( Topic::NEW_SHORT_URL->value, - ); + ['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)], + )); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify Redis pub/sub with new short URL. {e}', ['e' => $e]); } diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php index 0677c1ed..3df32267 100644 --- a/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php @@ -6,8 +6,9 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; -use Shlinkio\Shlink\Common\Cache\RedisPublishingHelperInterface; use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; use Shlinkio\Shlink\Core\EventDispatcher\Topic; @@ -18,7 +19,7 @@ use function Functional\each; class NotifyVisitToRedis { public function __construct( - private readonly RedisPublishingHelperInterface $redisHelper, + private readonly PublishingHelperInterface $redisHelper, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, private readonly DataTransformerInterface $orphanVisitTransformer, @@ -48,7 +49,9 @@ class NotifyVisitToRedis $payload = $this->visitToPayload($visit); try { - each($queues, fn (string $queue) => $this->redisHelper->publishPayloadInQueue($payload, $queue)); + each($queues, fn (string $queue) => $this->redisHelper->publishUpdate( + Update::forTopicAndPayload($queue, $payload), + )); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify Redis pub/sub with new visit. {e}', ['e' => $e]); } diff --git a/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php b/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php index e98e342f..1f23b18b 100644 --- a/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php +++ b/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php @@ -13,7 +13,8 @@ use Prophecy\PhpUnit\ProphecyTrait; use Prophecy\Prophecy\ObjectProphecy; use Psr\Log\LoggerInterface; use RuntimeException; -use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; use Shlinkio\Shlink\Core\EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq; @@ -35,7 +36,7 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase protected function setUp(): void { - $this->helper = $this->prophesize(RabbitMqPublishingHelperInterface::class); + $this->helper = $this->prophesize(PublishingHelperInterface::class); $this->em = $this->prophesize(EntityManagerInterface::class); $this->logger = $this->prophesize(LoggerInterface::class); $this->options = new RabbitMqOptions(['enabled' => true]); @@ -59,7 +60,7 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase $this->em->find(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); - $this->helper->publishPayloadInQueue(Argument::cetera())->shouldNotHaveBeenCalled(); + $this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled(); } /** @test */ @@ -77,7 +78,7 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase $find->shouldHaveBeenCalledOnce(); $logWarning->shouldHaveBeenCalledOnce(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); - $this->helper->publishPayloadInQueue(Argument::cetera())->shouldNotHaveBeenCalled(); + $this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled(); } /** @test */ @@ -89,9 +90,8 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase ($this->listener)(new ShortUrlCreated($shortUrlId)); $find->shouldHaveBeenCalledOnce(); - $this->helper->publishPayloadInQueue( - Argument::type('array'), - Topic::NEW_SHORT_URL->value, + $this->helper->publishUpdate( + Argument::that(fn (Update $update) => $update->topic === Topic::NEW_SHORT_URL->value), )->shouldHaveBeenCalledOnce(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); } @@ -104,7 +104,7 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase { $shortUrlId = '123'; $find = $this->em->find(ShortUrl::class, $shortUrlId)->willReturn(ShortUrl::withLongUrl('')); - $publish = $this->helper->publishPayloadInQueue(Argument::cetera())->willThrow($e); + $publish = $this->helper->publishUpdate(Argument::cetera())->willThrow($e); ($this->listener)(new ShortUrlCreated($shortUrlId)); diff --git a/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php b/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php index 6d367c17..ada7e551 100644 --- a/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php +++ b/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php @@ -14,7 +14,8 @@ use Prophecy\PhpUnit\ProphecyTrait; use Prophecy\Prophecy\ObjectProphecy; use Psr\Log\LoggerInterface; use RuntimeException; -use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; @@ -42,7 +43,7 @@ class NotifyVisitToRabbitMqTest extends TestCase protected function setUp(): void { - $this->helper = $this->prophesize(RabbitMqPublishingHelperInterface::class); + $this->helper = $this->prophesize(PublishingHelperInterface::class); $this->em = $this->prophesize(EntityManagerInterface::class); $this->logger = $this->prophesize(LoggerInterface::class); $this->options = new RabbitMqOptions(['enabled' => true, 'legacy_visits_publishing' => true]); @@ -67,7 +68,7 @@ class NotifyVisitToRabbitMqTest extends TestCase $this->em->find(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); - $this->helper->publishPayloadInQueue(Argument::cetera())->shouldNotHaveBeenCalled(); + $this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled(); } /** @test */ @@ -85,7 +86,7 @@ class NotifyVisitToRabbitMqTest extends TestCase $findVisit->shouldHaveBeenCalledOnce(); $logWarning->shouldHaveBeenCalledOnce(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); - $this->helper->publishPayloadInQueue(Argument::cetera())->shouldNotHaveBeenCalled(); + $this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled(); } /** @@ -97,16 +98,15 @@ class NotifyVisitToRabbitMqTest extends TestCase $visitId = '123'; $findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit); $argumentWithExpectedChannels = Argument::that( - static fn (string $channel) => contains($expectedChannels, $channel), + static fn (Update $update) => contains($expectedChannels, $update->topic), ); ($this->listener)(new VisitLocated($visitId)); $findVisit->shouldHaveBeenCalledOnce(); - $this->helper->publishPayloadInQueue( - Argument::type('array'), - $argumentWithExpectedChannels, - )->shouldHaveBeenCalledTimes(count($expectedChannels)); + $this->helper->publishUpdate($argumentWithExpectedChannels)->shouldHaveBeenCalledTimes( + count($expectedChannels), + ); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); } @@ -135,7 +135,7 @@ class NotifyVisitToRabbitMqTest extends TestCase { $visitId = '123'; $findVisit = $this->em->find(Visit::class, $visitId)->willReturn(Visit::forBasePath(Visitor::emptyInstance())); - $publish = $this->helper->publishPayloadInQueue(Argument::cetera())->willThrow($e); + $publish = $this->helper->publishUpdate(Argument::cetera())->willThrow($e); ($this->listener)(new VisitLocated($visitId)); @@ -171,7 +171,7 @@ class NotifyVisitToRabbitMqTest extends TestCase ($this->listener)(new VisitLocated($visitId)); $findVisit->shouldHaveBeenCalledOnce(); - $this->helper->publishPayloadInQueue(Argument::that($assertPayload), Argument::type('string')) + $this->helper->publishUpdate(Argument::that($assertPayload)) ->shouldHaveBeenCalled(); } @@ -180,7 +180,8 @@ class NotifyVisitToRabbitMqTest extends TestCase yield 'non-legacy non-orphan visit' => [ true, $visit = Visit::forValidShortUrl(ShortUrl::withLongUrl(''), Visitor::emptyInstance()), - function (array $payload) use ($visit): bool { + function (Update $update) use ($visit): bool { + $payload = $update->payload; Assert::assertEquals($payload, $visit->jsonSerialize()); Assert::assertArrayNotHasKey('visitedUrl', $payload); Assert::assertArrayNotHasKey('type', $payload); @@ -193,7 +194,8 @@ class NotifyVisitToRabbitMqTest extends TestCase yield 'non-legacy orphan visit' => [ true, Visit::forBasePath(Visitor::emptyInstance()), - function (array $payload): bool { + function (Update $update): bool { + $payload = $update->payload; Assert::assertArrayHasKey('visitedUrl', $payload); Assert::assertArrayHasKey('type', $payload); @@ -203,7 +205,8 @@ class NotifyVisitToRabbitMqTest extends TestCase yield 'legacy non-orphan visit' => [ false, $visit = Visit::forValidShortUrl(ShortUrl::withLongUrl(''), Visitor::emptyInstance()), - function (array $payload) use ($visit): bool { + function (Update $update) use ($visit): bool { + $payload = $update->payload; Assert::assertArrayHasKey('visit', $payload); Assert::assertArrayHasKey('shortUrl', $payload); Assert::assertIsArray($payload['visit']); @@ -217,7 +220,8 @@ class NotifyVisitToRabbitMqTest extends TestCase yield 'legacy orphan visit' => [ false, Visit::forBasePath(Visitor::emptyInstance()), - function (array $payload): bool { + function (Update $update): bool { + $payload = $update->payload; Assert::assertArrayHasKey('visit', $payload); Assert::assertArrayNotHasKey('shortUrl', $payload); Assert::assertIsArray($payload['visit']); From 1b089749c0bd713be4b138a741fbbb6de3705715 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Tue, 26 Jul 2022 12:17:37 +0200 Subject: [PATCH 05/19] Migrated mercure event listeners to use new publishing helper from shlink-common --- .../Core/config/event_dispatcher.config.php | 6 ++--- .../Mercure/NotifyNewShortUrlToMercure.php | 6 ++--- .../Mercure/NotifyVisitToMercure.php | 11 +++++---- .../src/Mercure/MercureUpdatesGenerator.php | 24 +++++++++---------- .../MercureUpdatesGeneratorInterface.php | 2 +- .../NotifyNewShortUrlToMercureTest.php | 21 ++++++++-------- .../Mercure/NotifyVisitToMercureTest.php | 24 +++++++++---------- .../Mercure/MercureUpdatesGeneratorTest.php | 16 ++++++------- 8 files changed, 55 insertions(+), 55 deletions(-) diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index 908c8183..4ac8f365 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -8,10 +8,10 @@ use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory; use Psr\EventDispatcher\EventDispatcherInterface; use Shlinkio\Shlink\CLI\Util\GeolocationDbUpdater; use Shlinkio\Shlink\Common\Cache\RedisPublishingHelper; +use Shlinkio\Shlink\Common\Mercure\MercureHubPublishingHelper; use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelper; use Shlinkio\Shlink\IpGeolocation\GeoLite2\DbUpdater; use Shlinkio\Shlink\IpGeolocation\Resolver\IpLocationResolverInterface; -use Symfony\Component\Mercure\Hub; return [ @@ -92,13 +92,13 @@ return [ Options\AppOptions::class, ], EventDispatcher\Mercure\NotifyVisitToMercure::class => [ - Hub::class, + MercureHubPublishingHelper::class, Mercure\MercureUpdatesGenerator::class, 'em', 'Logger_Shlink', ], EventDispatcher\Mercure\NotifyNewShortUrlToMercure::class => [ - Hub::class, + MercureHubPublishingHelper::class, Mercure\MercureUpdatesGenerator::class, 'em', 'Logger_Shlink', diff --git a/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php b/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php index 8e93d88b..15147403 100644 --- a/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php +++ b/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php @@ -6,16 +6,16 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\Mercure; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGeneratorInterface; -use Symfony\Component\Mercure\HubInterface; use Throwable; class NotifyNewShortUrlToMercure { public function __construct( - private readonly HubInterface $hub, + private readonly PublishingHelperInterface $mercureHelper, private readonly MercureUpdatesGeneratorInterface $updatesGenerator, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, @@ -36,7 +36,7 @@ class NotifyNewShortUrlToMercure } try { - $this->hub->publish($this->updatesGenerator->newShortUrlUpdate($shortUrl)); + $this->mercureHelper->publishUpdate($this->updatesGenerator->newShortUrlUpdate($shortUrl)); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify mercure hub with new short URL. {e}', ['e' => $e]); } diff --git a/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php b/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php index f9610021..6eab680c 100644 --- a/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php +++ b/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php @@ -6,11 +6,11 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\Mercure; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGeneratorInterface; -use Symfony\Component\Mercure\HubInterface; -use Symfony\Component\Mercure\Update; use Throwable; use function Functional\each; @@ -18,7 +18,7 @@ use function Functional\each; class NotifyVisitToMercure { public function __construct( - private readonly HubInterface $hub, + private readonly PublishingHelperInterface $mercureHelper, private readonly MercureUpdatesGeneratorInterface $updatesGenerator, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, @@ -39,7 +39,10 @@ class NotifyVisitToMercure } try { - each($this->determineUpdatesForVisit($visit), fn (Update $update) => $this->hub->publish($update)); + each( + $this->determineUpdatesForVisit($visit), + fn (Update $update) => $this->mercureHelper->publishUpdate($update), + ); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify mercure hub with new visit. {e}', [ 'e' => $e, diff --git a/module/Core/src/Mercure/MercureUpdatesGenerator.php b/module/Core/src/Mercure/MercureUpdatesGenerator.php index 0f01faa2..055c4fb6 100644 --- a/module/Core/src/Mercure/MercureUpdatesGenerator.php +++ b/module/Core/src/Mercure/MercureUpdatesGenerator.php @@ -5,12 +5,10 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\Mercure; use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Topic; -use Symfony\Component\Mercure\Update; - -use function Shlinkio\Shlink\Common\json_encode; final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface { @@ -22,17 +20,17 @@ final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface public function newVisitUpdate(Visit $visit): Update { - return new Update(Topic::NEW_VISIT->value, json_encode([ + return Update::forTopicAndPayload(Topic::NEW_VISIT->value, [ 'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()), - 'visit' => $visit, - ])); + 'visit' => $visit->jsonSerialize(), + ]); } public function newOrphanVisitUpdate(Visit $visit): Update { - return new Update(Topic::NEW_ORPHAN_VISIT->value, json_encode([ + return Update::forTopicAndPayload(Topic::NEW_ORPHAN_VISIT->value, [ 'visit' => $this->orphanVisitTransformer->transform($visit), - ])); + ]); } public function newShortUrlVisitUpdate(Visit $visit): Update @@ -40,16 +38,16 @@ final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface $shortUrl = $visit->getShortUrl(); $topic = Topic::newShortUrlVisit($shortUrl?->getShortCode()); - return new Update($topic, json_encode([ + return Update::forTopicAndPayload($topic, [ 'shortUrl' => $this->shortUrlTransformer->transform($shortUrl), - 'visit' => $visit, - ])); + 'visit' => $visit->jsonSerialize(), + ]); } public function newShortUrlUpdate(ShortUrl $shortUrl): Update { - return new Update(Topic::NEW_SHORT_URL->value, json_encode([ + return Update::forTopicAndPayload(Topic::NEW_SHORT_URL->value, [ 'shortUrl' => $this->shortUrlTransformer->transform($shortUrl), - ])); + ]); } } diff --git a/module/Core/src/Mercure/MercureUpdatesGeneratorInterface.php b/module/Core/src/Mercure/MercureUpdatesGeneratorInterface.php index ee0cd593..732b6954 100644 --- a/module/Core/src/Mercure/MercureUpdatesGeneratorInterface.php +++ b/module/Core/src/Mercure/MercureUpdatesGeneratorInterface.php @@ -4,9 +4,9 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\Mercure; +use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\Visit; -use Symfony\Component\Mercure\Update; interface MercureUpdatesGeneratorInterface { diff --git a/module/Core/test/EventDispatcher/Mercure/NotifyNewShortUrlToMercureTest.php b/module/Core/test/EventDispatcher/Mercure/NotifyNewShortUrlToMercureTest.php index 6bc2d527..d360a15c 100644 --- a/module/Core/test/EventDispatcher/Mercure/NotifyNewShortUrlToMercureTest.php +++ b/module/Core/test/EventDispatcher/Mercure/NotifyNewShortUrlToMercureTest.php @@ -11,32 +11,32 @@ use Prophecy\Argument; use Prophecy\PhpUnit\ProphecyTrait; use Prophecy\Prophecy\ObjectProphecy; use Psr\Log\LoggerInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; use Shlinkio\Shlink\Core\EventDispatcher\Mercure\NotifyNewShortUrlToMercure; use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGeneratorInterface; -use Symfony\Component\Mercure\HubInterface; -use Symfony\Component\Mercure\Update; class NotifyNewShortUrlToMercureTest extends TestCase { use ProphecyTrait; private NotifyNewShortUrlToMercure $listener; - private ObjectProphecy $hub; + private ObjectProphecy $helper; private ObjectProphecy $updatesGenerator; private ObjectProphecy $em; private ObjectProphecy $logger; protected function setUp(): void { - $this->hub = $this->prophesize(HubInterface::class); + $this->helper = $this->prophesize(PublishingHelperInterface::class); $this->updatesGenerator = $this->prophesize(MercureUpdatesGeneratorInterface::class); $this->em = $this->prophesize(EntityManagerInterface::class); $this->logger = $this->prophesize(LoggerInterface::class); $this->listener = new NotifyNewShortUrlToMercure( - $this->hub->reveal(), + $this->helper->reveal(), $this->updatesGenerator->reveal(), $this->em->reveal(), $this->logger->reveal(), @@ -55,7 +55,7 @@ class NotifyNewShortUrlToMercureTest extends TestCase 'Tried to notify Mercure for new short URL with id "{shortUrlId}", but it does not exist.', ['shortUrlId' => '123'], )->shouldHaveBeenCalledOnce(); - $this->hub->publish(Argument::cetera())->shouldNotHaveBeenCalled(); + $this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled(); $this->updatesGenerator->newShortUrlUpdate(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); } @@ -64,17 +64,16 @@ class NotifyNewShortUrlToMercureTest extends TestCase public function expectedNotificationIsPublished(): void { $shortUrl = ShortUrl::withLongUrl(''); - $update = new Update([]); + $update = Update::forTopicAndPayload('', []); $find = $this->em->find(ShortUrl::class, '123')->willReturn($shortUrl); $newUpdate = $this->updatesGenerator->newShortUrlUpdate($shortUrl)->willReturn($update); - $publish = $this->hub->publish($update)->willReturn(''); ($this->listener)(new ShortUrlCreated('123')); $find->shouldHaveBeenCalledOnce(); $newUpdate->shouldHaveBeenCalledOnce(); - $publish->shouldHaveBeenCalledOnce(); + $this->helper->publishUpdate($update)->shouldHaveBeenCalledOnce(); $this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); } @@ -83,12 +82,12 @@ class NotifyNewShortUrlToMercureTest extends TestCase public function messageIsPrintedIfPublishingFails(): void { $shortUrl = ShortUrl::withLongUrl(''); - $update = new Update([]); + $update = Update::forTopicAndPayload('', []); $e = new Exception('Error'); $find = $this->em->find(ShortUrl::class, '123')->willReturn($shortUrl); $newUpdate = $this->updatesGenerator->newShortUrlUpdate($shortUrl)->willReturn($update); - $publish = $this->hub->publish($update)->willThrow($e); + $publish = $this->helper->publishUpdate($update)->willThrow($e); ($this->listener)(new ShortUrlCreated('123')); diff --git a/module/Core/test/EventDispatcher/Mercure/NotifyVisitToMercureTest.php b/module/Core/test/EventDispatcher/Mercure/NotifyVisitToMercureTest.php index bdcb72a8..a0ec417c 100644 --- a/module/Core/test/EventDispatcher/Mercure/NotifyVisitToMercureTest.php +++ b/module/Core/test/EventDispatcher/Mercure/NotifyVisitToMercureTest.php @@ -11,6 +11,8 @@ use Prophecy\PhpUnit\ProphecyTrait; use Prophecy\Prophecy\ObjectProphecy; use Psr\Log\LoggerInterface; use RuntimeException; +use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; +use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; @@ -18,28 +20,26 @@ use Shlinkio\Shlink\Core\EventDispatcher\Mercure\NotifyVisitToMercure; use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGeneratorInterface; use Shlinkio\Shlink\Core\Model\Visitor; use Shlinkio\Shlink\Core\Visit\Model\VisitType; -use Symfony\Component\Mercure\HubInterface; -use Symfony\Component\Mercure\Update; class NotifyVisitToMercureTest extends TestCase { use ProphecyTrait; private NotifyVisitToMercure $listener; - private ObjectProphecy $hub; + private ObjectProphecy $helper; private ObjectProphecy $updatesGenerator; private ObjectProphecy $em; private ObjectProphecy $logger; public function setUp(): void { - $this->hub = $this->prophesize(HubInterface::class); + $this->helper = $this->prophesize(PublishingHelperInterface::class); $this->updatesGenerator = $this->prophesize(MercureUpdatesGeneratorInterface::class); $this->em = $this->prophesize(EntityManagerInterface::class); $this->logger = $this->prophesize(LoggerInterface::class); $this->listener = new NotifyVisitToMercure( - $this->hub->reveal(), + $this->helper->reveal(), $this->updatesGenerator->reveal(), $this->em->reveal(), $this->logger->reveal(), @@ -61,7 +61,7 @@ class NotifyVisitToMercureTest extends TestCase ); $buildNewOrphanVisitUpdate = $this->updatesGenerator->newOrphanVisitUpdate(Argument::type(Visit::class)); $buildNewVisitUpdate = $this->updatesGenerator->newVisitUpdate(Argument::type(Visit::class)); - $publish = $this->hub->publish(Argument::type(Update::class)); + $publish = $this->helper->publishUpdate(Argument::type(Update::class)); ($this->listener)(new VisitLocated($visitId)); @@ -79,7 +79,7 @@ class NotifyVisitToMercureTest extends TestCase { $visitId = '123'; $visit = Visit::forValidShortUrl(ShortUrl::createEmpty(), Visitor::emptyInstance()); - $update = new Update('', ''); + $update = Update::forTopicAndPayload('', []); $findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit); $logWarning = $this->logger->warning(Argument::cetera()); @@ -87,7 +87,7 @@ class NotifyVisitToMercureTest extends TestCase $buildNewShortUrlVisitUpdate = $this->updatesGenerator->newShortUrlVisitUpdate($visit)->willReturn($update); $buildNewOrphanVisitUpdate = $this->updatesGenerator->newOrphanVisitUpdate($visit)->willReturn($update); $buildNewVisitUpdate = $this->updatesGenerator->newVisitUpdate($visit)->willReturn($update); - $publish = $this->hub->publish($update); + $publish = $this->helper->publishUpdate($update); ($this->listener)(new VisitLocated($visitId)); @@ -105,7 +105,7 @@ class NotifyVisitToMercureTest extends TestCase { $visitId = '123'; $visit = Visit::forValidShortUrl(ShortUrl::createEmpty(), Visitor::emptyInstance()); - $update = new Update('', ''); + $update = Update::forTopicAndPayload('', []); $e = new RuntimeException('Error'); $findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit); @@ -116,7 +116,7 @@ class NotifyVisitToMercureTest extends TestCase $buildNewShortUrlVisitUpdate = $this->updatesGenerator->newShortUrlVisitUpdate($visit)->willReturn($update); $buildNewOrphanVisitUpdate = $this->updatesGenerator->newOrphanVisitUpdate($visit)->willReturn($update); $buildNewVisitUpdate = $this->updatesGenerator->newVisitUpdate($visit)->willReturn($update); - $publish = $this->hub->publish($update)->willThrow($e); + $publish = $this->helper->publishUpdate($update)->willThrow($e); ($this->listener)(new VisitLocated($visitId)); @@ -136,7 +136,7 @@ class NotifyVisitToMercureTest extends TestCase public function notificationsAreSentForOrphanVisits(Visit $visit): void { $visitId = '123'; - $update = new Update('', ''); + $update = Update::forTopicAndPayload('', []); $findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit); $logWarning = $this->logger->warning(Argument::cetera()); @@ -144,7 +144,7 @@ class NotifyVisitToMercureTest extends TestCase $buildNewShortUrlVisitUpdate = $this->updatesGenerator->newShortUrlVisitUpdate($visit)->willReturn($update); $buildNewOrphanVisitUpdate = $this->updatesGenerator->newOrphanVisitUpdate($visit)->willReturn($update); $buildNewVisitUpdate = $this->updatesGenerator->newVisitUpdate($visit)->willReturn($update); - $publish = $this->hub->publish($update); + $publish = $this->helper->publishUpdate($update); ($this->listener)(new VisitLocated($visitId)); diff --git a/module/Core/test/Mercure/MercureUpdatesGeneratorTest.php b/module/Core/test/Mercure/MercureUpdatesGeneratorTest.php index d3521f10..5e5910d7 100644 --- a/module/Core/test/Mercure/MercureUpdatesGeneratorTest.php +++ b/module/Core/test/Mercure/MercureUpdatesGeneratorTest.php @@ -5,6 +5,7 @@ declare(strict_types=1); namespace ShlinkioTest\Shlink\Core\Mercure; use PHPUnit\Framework\TestCase; +use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Topic; @@ -16,8 +17,6 @@ use Shlinkio\Shlink\Core\ShortUrl\Transformer\ShortUrlDataTransformer; use Shlinkio\Shlink\Core\Visit\Model\VisitType; use Shlinkio\Shlink\Core\Visit\Transformer\OrphanVisitDataTransformer; -use function Shlinkio\Shlink\Common\json_decode; - class MercureUpdatesGeneratorTest extends TestCase { private MercureUpdatesGenerator $generator; @@ -43,9 +42,10 @@ class MercureUpdatesGeneratorTest extends TestCase ])); $visit = Visit::forValidShortUrl($shortUrl, Visitor::emptyInstance()); + /** @var Update $update */ $update = $this->generator->{$method}($visit); - self::assertEquals([$expectedTopic], $update->getTopics()); + self::assertEquals($expectedTopic, $update->topic); self::assertEquals([ 'shortUrl' => [ 'shortCode' => $shortUrl->getShortCode(), @@ -71,7 +71,7 @@ class MercureUpdatesGeneratorTest extends TestCase 'date' => $visit->getDate()->toAtomString(), 'potentialBot' => false, ], - ], json_decode($update->getData())); + ], $update->payload); } public function provideMethod(): iterable @@ -88,7 +88,7 @@ class MercureUpdatesGeneratorTest extends TestCase { $update = $this->generator->newOrphanVisitUpdate($orphanVisit); - self::assertEquals(['https://shlink.io/new-orphan-visit'], $update->getTopics()); + self::assertEquals('https://shlink.io/new-orphan-visit', $update->topic); self::assertEquals([ 'visit' => [ 'referer' => '', @@ -99,7 +99,7 @@ class MercureUpdatesGeneratorTest extends TestCase 'visitedUrl' => $orphanVisit->visitedUrl(), 'type' => $orphanVisit->type()->value, ], - ], json_decode($update->getData())); + ], $update->payload); } public function provideOrphanVisits(): iterable @@ -122,7 +122,7 @@ class MercureUpdatesGeneratorTest extends TestCase $update = $this->generator->newShortUrlUpdate($shortUrl); - self::assertEquals([Topic::NEW_SHORT_URL->value], $update->getTopics()); + self::assertEquals(Topic::NEW_SHORT_URL->value, $update->topic); self::assertEquals(['shortUrl' => [ 'shortCode' => $shortUrl->getShortCode(), 'shortUrl' => 'http:/' . $shortUrl->getShortCode(), @@ -139,6 +139,6 @@ class MercureUpdatesGeneratorTest extends TestCase 'title' => $shortUrl->title(), 'crawlable' => false, 'forwardQuery' => true, - ],], json_decode($update->getData())); + ]], $update->payload); } } From d3add6d8e43b12a22be89bced63603498562135f Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Tue, 26 Jul 2022 12:18:58 +0200 Subject: [PATCH 06/19] Added TODO --- module/Core/src/Mercure/MercureUpdatesGenerator.php | 1 + 1 file changed, 1 insertion(+) diff --git a/module/Core/src/Mercure/MercureUpdatesGenerator.php b/module/Core/src/Mercure/MercureUpdatesGenerator.php index 055c4fb6..33da0c6d 100644 --- a/module/Core/src/Mercure/MercureUpdatesGenerator.php +++ b/module/Core/src/Mercure/MercureUpdatesGenerator.php @@ -10,6 +10,7 @@ use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Topic; +// TODO This class can now be use in an agnostic way on all listeners final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface { public function __construct( From 7e8109caa3ae1eb7323cbd516b3ff4fa6d5c2e7d Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Wed, 27 Jul 2022 09:38:47 +0200 Subject: [PATCH 07/19] Renamed MercureUpdatesGenerator to PublishingUpdatesGenerator to make it general purpose --- module/Core/config/dependencies.config.php | 4 ++-- module/Core/config/event_dispatcher.config.php | 4 ++-- .../Mercure/NotifyNewShortUrlToMercure.php | 4 ++-- .../EventDispatcher/Mercure/NotifyVisitToMercure.php | 4 ++-- .../PublishingUpdatesGenerator.php} | 6 ++---- .../PublishingUpdatesGeneratorInterface.php} | 4 ++-- .../Visit/Transformer/OrphanVisitDataTransformer.php | 1 - .../Mercure/NotifyNewShortUrlToMercureTest.php | 4 ++-- .../Mercure/NotifyVisitToMercureTest.php | 4 ++-- .../PublishingUpdatesGeneratorTest.php} | 10 +++++----- 10 files changed, 21 insertions(+), 24 deletions(-) rename module/Core/src/{Mercure/MercureUpdatesGenerator.php => EventDispatcher/PublishingUpdatesGenerator.php} (86%) rename module/Core/src/{Mercure/MercureUpdatesGeneratorInterface.php => EventDispatcher/PublishingUpdatesGeneratorInterface.php} (82%) rename module/Core/test/{Mercure/MercureUpdatesGeneratorTest.php => EventDispatcher/PublishingUpdatesGeneratorTest.php} (94%) diff --git a/module/Core/config/dependencies.config.php b/module/Core/config/dependencies.config.php index f4189dde..9edc5fc2 100644 --- a/module/Core/config/dependencies.config.php +++ b/module/Core/config/dependencies.config.php @@ -64,7 +64,7 @@ return [ ShortUrl\Transformer\ShortUrlDataTransformer::class => ConfigAbstractFactory::class, ShortUrl\Middleware\ExtraPathRedirectMiddleware::class => ConfigAbstractFactory::class, - Mercure\MercureUpdatesGenerator::class => ConfigAbstractFactory::class, + EventDispatcher\PublishingUpdatesGenerator::class => ConfigAbstractFactory::class, Importer\ImportedLinksProcessor::class => ConfigAbstractFactory::class, @@ -160,7 +160,7 @@ return [ Options\UrlShortenerOptions::class, ], - Mercure\MercureUpdatesGenerator::class => [ + EventDispatcher\PublishingUpdatesGenerator::class => [ ShortUrl\Transformer\ShortUrlDataTransformer::class, Visit\Transformer\OrphanVisitDataTransformer::class, ], diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index 4ac8f365..d2e3d08f 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -93,13 +93,13 @@ return [ ], EventDispatcher\Mercure\NotifyVisitToMercure::class => [ MercureHubPublishingHelper::class, - Mercure\MercureUpdatesGenerator::class, + EventDispatcher\PublishingUpdatesGenerator::class, 'em', 'Logger_Shlink', ], EventDispatcher\Mercure\NotifyNewShortUrlToMercure::class => [ MercureHubPublishingHelper::class, - Mercure\MercureUpdatesGenerator::class, + EventDispatcher\PublishingUpdatesGenerator::class, 'em', 'Logger_Shlink', ], diff --git a/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php b/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php index 15147403..fba3a57d 100644 --- a/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php +++ b/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php @@ -9,14 +9,14 @@ use Psr\Log\LoggerInterface; use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; -use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGeneratorInterface; +use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; use Throwable; class NotifyNewShortUrlToMercure { public function __construct( private readonly PublishingHelperInterface $mercureHelper, - private readonly MercureUpdatesGeneratorInterface $updatesGenerator, + private readonly PublishingUpdatesGeneratorInterface $updatesGenerator, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, ) { diff --git a/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php b/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php index 6eab680c..cd55fcb2 100644 --- a/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php +++ b/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php @@ -10,7 +10,7 @@ use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; -use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGeneratorInterface; +use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; use Throwable; use function Functional\each; @@ -19,7 +19,7 @@ class NotifyVisitToMercure { public function __construct( private readonly PublishingHelperInterface $mercureHelper, - private readonly MercureUpdatesGeneratorInterface $updatesGenerator, + private readonly PublishingUpdatesGeneratorInterface $updatesGenerator, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, ) { diff --git a/module/Core/src/Mercure/MercureUpdatesGenerator.php b/module/Core/src/EventDispatcher/PublishingUpdatesGenerator.php similarity index 86% rename from module/Core/src/Mercure/MercureUpdatesGenerator.php rename to module/Core/src/EventDispatcher/PublishingUpdatesGenerator.php index 33da0c6d..0f7de480 100644 --- a/module/Core/src/Mercure/MercureUpdatesGenerator.php +++ b/module/Core/src/EventDispatcher/PublishingUpdatesGenerator.php @@ -2,16 +2,14 @@ declare(strict_types=1); -namespace Shlinkio\Shlink\Core\Mercure; +namespace Shlinkio\Shlink\Core\EventDispatcher; use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\Visit; -use Shlinkio\Shlink\Core\EventDispatcher\Topic; -// TODO This class can now be use in an agnostic way on all listeners -final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface +final class PublishingUpdatesGenerator implements PublishingUpdatesGeneratorInterface { public function __construct( private readonly DataTransformerInterface $shortUrlTransformer, diff --git a/module/Core/src/Mercure/MercureUpdatesGeneratorInterface.php b/module/Core/src/EventDispatcher/PublishingUpdatesGeneratorInterface.php similarity index 82% rename from module/Core/src/Mercure/MercureUpdatesGeneratorInterface.php rename to module/Core/src/EventDispatcher/PublishingUpdatesGeneratorInterface.php index 732b6954..826157eb 100644 --- a/module/Core/src/Mercure/MercureUpdatesGeneratorInterface.php +++ b/module/Core/src/EventDispatcher/PublishingUpdatesGeneratorInterface.php @@ -2,13 +2,13 @@ declare(strict_types=1); -namespace Shlinkio\Shlink\Core\Mercure; +namespace Shlinkio\Shlink\Core\EventDispatcher; use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\Visit; -interface MercureUpdatesGeneratorInterface +interface PublishingUpdatesGeneratorInterface { public function newVisitUpdate(Visit $visit): Update; diff --git a/module/Core/src/Visit/Transformer/OrphanVisitDataTransformer.php b/module/Core/src/Visit/Transformer/OrphanVisitDataTransformer.php index c9d30b8d..0da5f4ba 100644 --- a/module/Core/src/Visit/Transformer/OrphanVisitDataTransformer.php +++ b/module/Core/src/Visit/Transformer/OrphanVisitDataTransformer.php @@ -11,7 +11,6 @@ class OrphanVisitDataTransformer implements DataTransformerInterface { /** * @param Visit $visit - * @return array */ public function transform($visit): array // phpcs:ignore { diff --git a/module/Core/test/EventDispatcher/Mercure/NotifyNewShortUrlToMercureTest.php b/module/Core/test/EventDispatcher/Mercure/NotifyNewShortUrlToMercureTest.php index d360a15c..d0c89b3b 100644 --- a/module/Core/test/EventDispatcher/Mercure/NotifyNewShortUrlToMercureTest.php +++ b/module/Core/test/EventDispatcher/Mercure/NotifyNewShortUrlToMercureTest.php @@ -16,7 +16,7 @@ use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; use Shlinkio\Shlink\Core\EventDispatcher\Mercure\NotifyNewShortUrlToMercure; -use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGeneratorInterface; +use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; class NotifyNewShortUrlToMercureTest extends TestCase { @@ -31,7 +31,7 @@ class NotifyNewShortUrlToMercureTest extends TestCase protected function setUp(): void { $this->helper = $this->prophesize(PublishingHelperInterface::class); - $this->updatesGenerator = $this->prophesize(MercureUpdatesGeneratorInterface::class); + $this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class); $this->em = $this->prophesize(EntityManagerInterface::class); $this->logger = $this->prophesize(LoggerInterface::class); diff --git a/module/Core/test/EventDispatcher/Mercure/NotifyVisitToMercureTest.php b/module/Core/test/EventDispatcher/Mercure/NotifyVisitToMercureTest.php index a0ec417c..65049f49 100644 --- a/module/Core/test/EventDispatcher/Mercure/NotifyVisitToMercureTest.php +++ b/module/Core/test/EventDispatcher/Mercure/NotifyVisitToMercureTest.php @@ -17,7 +17,7 @@ use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; use Shlinkio\Shlink\Core\EventDispatcher\Mercure\NotifyVisitToMercure; -use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGeneratorInterface; +use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; use Shlinkio\Shlink\Core\Model\Visitor; use Shlinkio\Shlink\Core\Visit\Model\VisitType; @@ -34,7 +34,7 @@ class NotifyVisitToMercureTest extends TestCase public function setUp(): void { $this->helper = $this->prophesize(PublishingHelperInterface::class); - $this->updatesGenerator = $this->prophesize(MercureUpdatesGeneratorInterface::class); + $this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class); $this->em = $this->prophesize(EntityManagerInterface::class); $this->logger = $this->prophesize(LoggerInterface::class); diff --git a/module/Core/test/Mercure/MercureUpdatesGeneratorTest.php b/module/Core/test/EventDispatcher/PublishingUpdatesGeneratorTest.php similarity index 94% rename from module/Core/test/Mercure/MercureUpdatesGeneratorTest.php rename to module/Core/test/EventDispatcher/PublishingUpdatesGeneratorTest.php index 5e5910d7..e4b616e8 100644 --- a/module/Core/test/Mercure/MercureUpdatesGeneratorTest.php +++ b/module/Core/test/EventDispatcher/PublishingUpdatesGeneratorTest.php @@ -2,14 +2,14 @@ declare(strict_types=1); -namespace ShlinkioTest\Shlink\Core\Mercure; +namespace ShlinkioTest\Shlink\Core\EventDispatcher; use PHPUnit\Framework\TestCase; use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\Visit; +use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGenerator; use Shlinkio\Shlink\Core\EventDispatcher\Topic; -use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGenerator; use Shlinkio\Shlink\Core\Model\ShortUrlMeta; use Shlinkio\Shlink\Core\Model\Visitor; use Shlinkio\Shlink\Core\ShortUrl\Helper\ShortUrlStringifier; @@ -17,13 +17,13 @@ use Shlinkio\Shlink\Core\ShortUrl\Transformer\ShortUrlDataTransformer; use Shlinkio\Shlink\Core\Visit\Model\VisitType; use Shlinkio\Shlink\Core\Visit\Transformer\OrphanVisitDataTransformer; -class MercureUpdatesGeneratorTest extends TestCase +class PublishingUpdatesGeneratorTest extends TestCase { - private MercureUpdatesGenerator $generator; + private PublishingUpdatesGenerator $generator; public function setUp(): void { - $this->generator = new MercureUpdatesGenerator( + $this->generator = new PublishingUpdatesGenerator( new ShortUrlDataTransformer(new ShortUrlStringifier([])), new OrphanVisitDataTransformer(), ); From 3c042c40113fa4dd9592c3f7f5c00f3a1e2cd47d Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Wed, 27 Jul 2022 10:18:28 +0200 Subject: [PATCH 08/19] Integrated PublishUpdatesGenerator in NotifyNewShortUrlToRabbitMq listener --- module/Core/config/event_dispatcher.config.php | 2 +- .../RabbitMq/NotifyNewShortUrlToRabbitMq.php | 11 +++-------- .../RabbitMq/NotifyNewShortUrlToRabbitMqTest.php | 16 ++++++++++------ 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index d2e3d08f..4a9dc32d 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -113,9 +113,9 @@ return [ ], EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => [ RabbitMqPublishingHelper::class, + EventDispatcher\PublishingUpdatesGenerator::class, 'em', 'Logger_Shlink', - ShortUrl\Transformer\ShortUrlDataTransformer::class, Options\RabbitMqOptions::class, ], EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [ diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php index 73ed84ca..ad562d2a 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php @@ -6,12 +6,10 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; -use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; -use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; -use Shlinkio\Shlink\Core\EventDispatcher\Topic; +use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; use Shlinkio\Shlink\Core\Options\RabbitMqOptions; use Throwable; @@ -19,9 +17,9 @@ class NotifyNewShortUrlToRabbitMq { public function __construct( private readonly PublishingHelperInterface $rabbitMqHelper, + private readonly PublishingUpdatesGeneratorInterface $updatesGenerator, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, - private readonly DataTransformerInterface $shortUrlTransformer, private readonly RabbitMqOptions $options, ) { } @@ -44,10 +42,7 @@ class NotifyNewShortUrlToRabbitMq } try { - $this->rabbitMqHelper->publishUpdate(Update::forTopicAndPayload( - Topic::NEW_SHORT_URL->value, - ['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)], - )); + $this->rabbitMqHelper->publishUpdate($this->updatesGenerator->newShortUrlUpdate($shortUrl)); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify RabbitMQ with new short URL. {e}', ['e' => $e]); } diff --git a/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php b/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php index 1f23b18b..ba735d6a 100644 --- a/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php +++ b/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php @@ -17,11 +17,10 @@ use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; +use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; use Shlinkio\Shlink\Core\EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq; use Shlinkio\Shlink\Core\EventDispatcher\Topic; use Shlinkio\Shlink\Core\Options\RabbitMqOptions; -use Shlinkio\Shlink\Core\ShortUrl\Helper\ShortUrlStringifier; -use Shlinkio\Shlink\Core\ShortUrl\Transformer\ShortUrlDataTransformer; use Throwable; class NotifyNewShortUrlToRabbitMqTest extends TestCase @@ -30,6 +29,7 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase private NotifyNewShortUrlToRabbitMq $listener; private ObjectProphecy $helper; + private ObjectProphecy $updatesGenerator; private ObjectProphecy $em; private ObjectProphecy $logger; private RabbitMqOptions $options; @@ -37,15 +37,16 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase protected function setUp(): void { $this->helper = $this->prophesize(PublishingHelperInterface::class); + $this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class); $this->em = $this->prophesize(EntityManagerInterface::class); $this->logger = $this->prophesize(LoggerInterface::class); $this->options = new RabbitMqOptions(['enabled' => true]); $this->listener = new NotifyNewShortUrlToRabbitMq( $this->helper->reveal(), + $this->updatesGenerator->reveal(), $this->em->reveal(), $this->logger->reveal(), - new ShortUrlDataTransformer(new ShortUrlStringifier([])), $this->options, ); } @@ -85,14 +86,17 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase public function expectedChannelIsNotified(): void { $shortUrlId = '123'; + $update = Update::forTopicAndPayload(Topic::NEW_SHORT_URL->value, []); $find = $this->em->find(ShortUrl::class, $shortUrlId)->willReturn(ShortUrl::withLongUrl('')); + $generateUpdate = $this->updatesGenerator->newShortUrlUpdate(Argument::type(ShortUrl::class))->willReturn( + $update, + ); ($this->listener)(new ShortUrlCreated($shortUrlId)); $find->shouldHaveBeenCalledOnce(); - $this->helper->publishUpdate( - Argument::that(fn (Update $update) => $update->topic === Topic::NEW_SHORT_URL->value), - )->shouldHaveBeenCalledOnce(); + $generateUpdate->shouldHaveBeenCalledOnce(); + $this->helper->publishUpdate($update)->shouldHaveBeenCalledOnce(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); } From f071df325d5a3943ce59f1f137c8ed2ff7c5edb0 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Wed, 27 Jul 2022 10:26:18 +0200 Subject: [PATCH 09/19] Fixed NotifyNewShortUrlToRabbitMqTest --- .../RabbitMq/NotifyNewShortUrlToRabbitMqTest.php | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php b/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php index ba735d6a..3bb46286 100644 --- a/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php +++ b/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php @@ -107,8 +107,12 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase public function printsDebugMessageInCaseOfError(Throwable $e): void { $shortUrlId = '123'; + $update = Update::forTopicAndPayload(Topic::NEW_SHORT_URL->value, []); $find = $this->em->find(ShortUrl::class, $shortUrlId)->willReturn(ShortUrl::withLongUrl('')); - $publish = $this->helper->publishUpdate(Argument::cetera())->willThrow($e); + $generateUpdate = $this->updatesGenerator->newShortUrlUpdate(Argument::type(ShortUrl::class))->willReturn( + $update, + ); + $publish = $this->helper->publishUpdate($update)->willThrow($e); ($this->listener)(new ShortUrlCreated($shortUrlId)); @@ -117,6 +121,7 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase ['e' => $e], )->shouldHaveBeenCalledOnce(); $find->shouldHaveBeenCalledOnce(); + $generateUpdate->shouldHaveBeenCalledOnce(); $publish->shouldHaveBeenCalledOnce(); } From fa5ebb16770e2700c005e2c17a8745a035e6eda4 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Wed, 27 Jul 2022 16:47:21 +0200 Subject: [PATCH 10/19] Integrated PublishUpdatesGenerator in NotifyNewShortUrlToRedis listener --- module/Core/config/event_dispatcher.config.php | 2 +- .../RedisPubSub/NotifyNewShortUrlToRedis.php | 11 +++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index 4a9dc32d..441a95cf 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -128,9 +128,9 @@ return [ ], EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [ RedisPublishingHelper::class, + EventDispatcher\PublishingUpdatesGenerator::class, 'em', 'Logger_Shlink', - ShortUrl\Transformer\ShortUrlDataTransformer::class, 'config.redis.pub_sub_enabled', ], EventDispatcher\UpdateGeoLiteDb::class => [GeolocationDbUpdater::class, 'Logger_Shlink'], diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php index 638aa88a..4a56858e 100644 --- a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php @@ -6,21 +6,19 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; -use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; -use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; -use Shlinkio\Shlink\Core\EventDispatcher\Topic; +use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; use Throwable; class NotifyNewShortUrlToRedis { public function __construct( private readonly PublishingHelperInterface $redisHelper, + private readonly PublishingUpdatesGeneratorInterface $updatesGenerator, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, - private readonly DataTransformerInterface $shortUrlTransformer, private readonly bool $enabled, ) { } @@ -43,10 +41,7 @@ class NotifyNewShortUrlToRedis } try { - $this->redisHelper->publishUpdate(Update::forTopicAndPayload( - Topic::NEW_SHORT_URL->value, - ['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)], - )); + $this->redisHelper->publishUpdate($this->updatesGenerator->newShortUrlUpdate($shortUrl)); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify Redis pub/sub with new short URL. {e}', ['e' => $e]); } From dada6aa3d10ee187acd68b445543e71481786e2f Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Wed, 27 Jul 2022 16:55:19 +0200 Subject: [PATCH 11/19] Integrated PublishUpdatesGenerator in NotifyVisitToRedis listener --- .../Core/config/event_dispatcher.config.php | 3 +- .../RedisPubSub/NotifyVisitToRedis.php | 35 +++++-------------- 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index 441a95cf..877269c9 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -120,10 +120,9 @@ return [ ], EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [ RedisPublishingHelper::class, + EventDispatcher\PublishingUpdatesGenerator::class, 'em', 'Logger_Shlink', - Visit\Transformer\OrphanVisitDataTransformer::class, - ShortUrl\Transformer\ShortUrlDataTransformer::class, 'config.redis.pub_sub_enabled', ], EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [ diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php index 3df32267..820921db 100644 --- a/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php @@ -6,12 +6,11 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; -use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; -use Shlinkio\Shlink\Core\EventDispatcher\Topic; +use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; use Throwable; use function Functional\each; @@ -20,10 +19,9 @@ class NotifyVisitToRedis { public function __construct( private readonly PublishingHelperInterface $redisHelper, + private readonly PublishingUpdatesGeneratorInterface $updatesGenerator, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, - private readonly DataTransformerInterface $orphanVisitTransformer, - private readonly DataTransformerInterface $shortUrlTransformer, private readonly bool $enabled, ) { } @@ -45,42 +43,27 @@ class NotifyVisitToRedis return; } - $queues = $this->determineQueuesToPublishTo($visit); - $payload = $this->visitToPayload($visit); + $updates = $this->determineUpdatesForVisit($visit); try { - each($queues, fn (string $queue) => $this->redisHelper->publishUpdate( - Update::forTopicAndPayload($queue, $payload), - )); + each($updates, fn (Update $update) => $this->redisHelper->publishUpdate($update)); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify Redis pub/sub with new visit. {e}', ['e' => $e]); } } /** - * @return string[] + * @return Update[] */ - private function determineQueuesToPublishTo(Visit $visit): array + private function determineUpdatesForVisit(Visit $visit): array { if ($visit->isOrphan()) { - return [Topic::NEW_ORPHAN_VISIT->value]; + return [$this->updatesGenerator->newOrphanVisitUpdate($visit)]; } return [ - Topic::NEW_VISIT->value, - Topic::newShortUrlVisit($visit->getShortUrl()?->getShortCode()), - ]; - } - - private function visitToPayload(Visit $visit): array - { - if ($visit->isOrphan()) { - return ['visit' => $this->orphanVisitTransformer->transform($visit)]; - } - - return [ - 'visit' => $visit->jsonSerialize(), - 'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()), + $this->updatesGenerator->newShortUrlVisitUpdate($visit), + $this->updatesGenerator->newVisitUpdate($visit), ]; } } From da6aa1d697ac82bf766396b05122a7231da86fe0 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Wed, 27 Jul 2022 17:41:48 +0200 Subject: [PATCH 12/19] Integrated PublishUpdatesGenerator in NotifyVisitToRabbitMq listener --- .../Core/config/event_dispatcher.config.php | 2 +- .../RabbitMq/NotifyVisitToRabbitMq.php | 66 +++++---- .../RabbitMq/NotifyVisitToRabbitMqTest.php | 131 ++++++++++-------- 3 files changed, 105 insertions(+), 94 deletions(-) diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index 877269c9..467f63cc 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -105,10 +105,10 @@ return [ ], EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class => [ RabbitMqPublishingHelper::class, + EventDispatcher\PublishingUpdatesGenerator::class, 'em', 'Logger_Shlink', Visit\Transformer\OrphanVisitDataTransformer::class, - ShortUrl\Transformer\ShortUrlDataTransformer::class, Options\RabbitMqOptions::class, ], EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => [ diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php index 8f8567db..51c9f423 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php @@ -11,6 +11,7 @@ use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; +use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; use Shlinkio\Shlink\Core\EventDispatcher\Topic; use Shlinkio\Shlink\Core\Options\RabbitMqOptions; use Throwable; @@ -21,10 +22,10 @@ class NotifyVisitToRabbitMq { public function __construct( private readonly PublishingHelperInterface $rabbitMqHelper, + private readonly PublishingUpdatesGeneratorInterface $updatesGenerator, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, private readonly DataTransformerInterface $orphanVisitTransformer, - private readonly DataTransformerInterface $shortUrlTransformer, private readonly RabbitMqOptions $options, ) { } @@ -45,50 +46,45 @@ class NotifyVisitToRabbitMq return; } - $queues = $this->determineQueuesToPublishTo($visit); - $payload = $this->visitToPayload($visit); + $updates = $this->determineUpdatesForVisit($visit); try { - each($queues, fn (string $queue) => $this->rabbitMqHelper->publishUpdate( - Update::forTopicAndPayload($queue, $payload), - )); + each($updates, fn (Update $update) => $this->rabbitMqHelper->publishUpdate($update)); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]); } } /** - * @return string[] + * @return Update[] */ - private function determineQueuesToPublishTo(Visit $visit): array + private function determineUpdatesForVisit(Visit $visit): array { - if ($visit->isOrphan()) { - return [Topic::NEW_ORPHAN_VISIT->value]; - } + return match (true) { + // This was defined incorrectly. + // According to the spec, both the visit and the short URL it belongs to, should be published. + // The shape should be ['visit' => [...], 'shortUrl' => ?[...]] + // However, this would be a breaking change, so we need a flag that determines the shape of the payload. + $this->options->legacyVisitsPublishing() && $visit->isOrphan() => [ + Update::forTopicAndPayload( + Topic::NEW_ORPHAN_VISIT->value, + $this->orphanVisitTransformer->transform($visit), + ), + ], + $this->options->legacyVisitsPublishing() && ! $visit->isOrphan() => [ + Update::forTopicAndPayload(Topic::NEW_VISIT->value, $visit->jsonSerialize()), + Update::forTopicAndPayload( + Topic::newShortUrlVisit($visit->getShortUrl()?->getShortCode()), + $visit->jsonSerialize(), + ), + ], - return [ - Topic::NEW_VISIT->value, - Topic::newShortUrlVisit($visit->getShortUrl()?->getShortCode()), - ]; - } - - private function visitToPayload(Visit $visit): array - { - // This was defined incorrectly. - // According to the spec, both the visit and the short URL it belongs to, should be published. - // The shape should be ['visit' => [...], 'shortUrl' => ?[...]] - // However, this would be a breaking change, so we need a flag that determines the shape of the payload. - if ($this->options->legacyVisitsPublishing()) { - return ! $visit->isOrphan() ? $visit->jsonSerialize() : $this->orphanVisitTransformer->transform($visit); - } - - if ($visit->isOrphan()) { - return ['visit' => $this->orphanVisitTransformer->transform($visit)]; - } - - return [ - 'visit' => $visit->jsonSerialize(), - 'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()), - ]; + // Once the two deprecated cases above have been remove, replace this with a simple "if" and early return. + $visit->isOrphan() => [$this->updatesGenerator->newOrphanVisitUpdate($visit)], + default => [ + $this->updatesGenerator->newShortUrlVisitUpdate($visit), + $this->updatesGenerator->newVisitUpdate($visit), + ], + }; } } diff --git a/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php b/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php index ada7e551..f2e31072 100644 --- a/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php +++ b/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php @@ -19,17 +19,17 @@ use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; +use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; use Shlinkio\Shlink\Core\EventDispatcher\RabbitMq\NotifyVisitToRabbitMq; use Shlinkio\Shlink\Core\Model\ShortUrlMeta; use Shlinkio\Shlink\Core\Model\Visitor; use Shlinkio\Shlink\Core\Options\RabbitMqOptions; -use Shlinkio\Shlink\Core\ShortUrl\Helper\ShortUrlStringifier; -use Shlinkio\Shlink\Core\ShortUrl\Transformer\ShortUrlDataTransformer; use Shlinkio\Shlink\Core\Visit\Transformer\OrphanVisitDataTransformer; use Throwable; use function count; -use function Functional\contains; +use function Functional\each; +use function Functional\noop; class NotifyVisitToRabbitMqTest extends TestCase { @@ -37,6 +37,7 @@ class NotifyVisitToRabbitMqTest extends TestCase private NotifyVisitToRabbitMq $listener; private ObjectProphecy $helper; + private ObjectProphecy $updatesGenerator; private ObjectProphecy $em; private ObjectProphecy $logger; private RabbitMqOptions $options; @@ -44,16 +45,17 @@ class NotifyVisitToRabbitMqTest extends TestCase protected function setUp(): void { $this->helper = $this->prophesize(PublishingHelperInterface::class); + $this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class); $this->em = $this->prophesize(EntityManagerInterface::class); $this->logger = $this->prophesize(LoggerInterface::class); - $this->options = new RabbitMqOptions(['enabled' => true, 'legacy_visits_publishing' => true]); + $this->options = new RabbitMqOptions(['enabled' => true, 'legacy_visits_publishing' => false]); $this->listener = new NotifyVisitToRabbitMq( $this->helper->reveal(), + $this->updatesGenerator->reveal(), $this->em->reveal(), $this->logger->reveal(), new OrphanVisitDataTransformer(), - new ShortUrlDataTransformer(new ShortUrlStringifier([])), $this->options, ); } @@ -97,14 +99,16 @@ class NotifyVisitToRabbitMqTest extends TestCase { $visitId = '123'; $findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit); - $argumentWithExpectedChannels = Argument::that( - static fn (Update $update) => contains($expectedChannels, $update->topic), - ); + each($expectedChannels, function (string $method): void { + $this->updatesGenerator->{$method}(Argument::type(Visit::class))->willReturn( + Update::forTopicAndPayload('', []), + )->shouldBeCalledOnce(); + }); ($this->listener)(new VisitLocated($visitId)); $findVisit->shouldHaveBeenCalledOnce(); - $this->helper->publishUpdate($argumentWithExpectedChannels)->shouldHaveBeenCalledTimes( + $this->helper->publishUpdate(Argument::type(Update::class))->shouldHaveBeenCalledTimes( count($expectedChannels), ); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); @@ -114,7 +118,7 @@ class NotifyVisitToRabbitMqTest extends TestCase { $visitor = Visitor::emptyInstance(); - yield 'orphan visit' => [Visit::forBasePath($visitor), ['https://shlink.io/new-orphan-visit']]; + yield 'orphan visit' => [Visit::forBasePath($visitor), ['newOrphanVisitUpdate']]; yield 'non-orphan visit' => [ Visit::forValidShortUrl( ShortUrl::fromMeta(ShortUrlMeta::fromRawData([ @@ -123,7 +127,7 @@ class NotifyVisitToRabbitMqTest extends TestCase ])), $visitor, ), - ['https://shlink.io/new-visit', 'https://shlink.io/new-visit/bar'], + ['newShortUrlVisitUpdate', 'newVisitUpdate'], ]; } @@ -135,6 +139,9 @@ class NotifyVisitToRabbitMqTest extends TestCase { $visitId = '123'; $findVisit = $this->em->find(Visit::class, $visitId)->willReturn(Visit::forBasePath(Visitor::emptyInstance())); + $generateUpdate = $this->updatesGenerator->newOrphanVisitUpdate(Argument::type(Visit::class))->willReturn( + Update::forTopicAndPayload('', []), + ); $publish = $this->helper->publishUpdate(Argument::cetera())->willThrow($e); ($this->listener)(new VisitLocated($visitId)); @@ -144,6 +151,7 @@ class NotifyVisitToRabbitMqTest extends TestCase ['e' => $e], )->shouldHaveBeenCalledOnce(); $findVisit->shouldHaveBeenCalledOnce(); + $generateUpdate->shouldHaveBeenCalledOnce(); $publish->shouldHaveBeenCalledOnce(); } @@ -161,74 +169,81 @@ class NotifyVisitToRabbitMqTest extends TestCase public function expectedPayloadIsPublishedDependingOnConfig( bool $legacy, Visit $visit, - callable $assertPayload, + callable $assert, + callable $setup, ): void { $this->options->legacyVisitsPublishing = $legacy; $visitId = '123'; $findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit); + $setup($this->updatesGenerator); ($this->listener)(new VisitLocated($visitId)); $findVisit->shouldHaveBeenCalledOnce(); - $this->helper->publishUpdate(Argument::that($assertPayload)) - ->shouldHaveBeenCalled(); + $assert($this->helper, $this->updatesGenerator); } public function provideLegacyPayloads(): iterable { - yield 'non-legacy non-orphan visit' => [ + yield 'legacy non-orphan visit' => [ true, $visit = Visit::forValidShortUrl(ShortUrl::withLongUrl(''), Visitor::emptyInstance()), - function (Update $update) use ($visit): bool { - $payload = $update->payload; - Assert::assertEquals($payload, $visit->jsonSerialize()); - Assert::assertArrayNotHasKey('visitedUrl', $payload); - Assert::assertArrayNotHasKey('type', $payload); - Assert::assertArrayNotHasKey('visit', $payload); - Assert::assertArrayNotHasKey('shortUrl', $payload); + function (ObjectProphecy|PublishingHelperInterface $helper) use ($visit): void { + $helper->publishUpdate(Argument::that(function (Update $update) use ($visit): bool { + $payload = $update->payload; + Assert::assertEquals($payload, $visit->jsonSerialize()); + Assert::assertArrayNotHasKey('visitedUrl', $payload); + Assert::assertArrayNotHasKey('type', $payload); + Assert::assertArrayNotHasKey('visit', $payload); + Assert::assertArrayNotHasKey('shortUrl', $payload); - return true; + return true; + })); + }, + noop(...), + ]; + yield 'legacy orphan visit' => [ + true, + Visit::forBasePath(Visitor::emptyInstance()), + function (ObjectProphecy|PublishingHelperInterface $helper): void { + $helper->publishUpdate(Argument::that(function (Update $update): bool { + $payload = $update->payload; + Assert::assertArrayHasKey('visitedUrl', $payload); + Assert::assertArrayHasKey('type', $payload); + + return true; + })); + }, + noop(...), + ]; + yield 'non-legacy non-orphan visit' => [ + false, + Visit::forValidShortUrl(ShortUrl::withLongUrl(''), Visitor::emptyInstance()), + function (ObjectProphecy|PublishingHelperInterface $helper): void { + $helper->publishUpdate(Argument::type(Update::class))->shouldHaveBeenCalledTimes(2); + }, + function (ObjectProphecy|PublishingUpdatesGeneratorInterface $updatesGenerator): void { + $update = Update::forTopicAndPayload('', []); + $updatesGenerator->newOrphanVisitUpdate(Argument::cetera())->shouldNotBeCalled(); + $updatesGenerator->newVisitUpdate(Argument::cetera())->willReturn($update) + ->shouldBeCalledOnce(); + $updatesGenerator->newShortUrlVisitUpdate(Argument::cetera())->willReturn($update) + ->shouldBeCalledOnce(); }, ]; yield 'non-legacy orphan visit' => [ - true, - Visit::forBasePath(Visitor::emptyInstance()), - function (Update $update): bool { - $payload = $update->payload; - Assert::assertArrayHasKey('visitedUrl', $payload); - Assert::assertArrayHasKey('type', $payload); - - return true; - }, - ]; - yield 'legacy non-orphan visit' => [ - false, - $visit = Visit::forValidShortUrl(ShortUrl::withLongUrl(''), Visitor::emptyInstance()), - function (Update $update) use ($visit): bool { - $payload = $update->payload; - Assert::assertArrayHasKey('visit', $payload); - Assert::assertArrayHasKey('shortUrl', $payload); - Assert::assertIsArray($payload['visit']); - Assert::assertEquals($payload['visit'], $visit->jsonSerialize()); - Assert::assertArrayNotHasKey('visitedUrl', ['visit']); - Assert::assertArrayNotHasKey('type', ['visit']); - - return true; - }, - ]; - yield 'legacy orphan visit' => [ false, Visit::forBasePath(Visitor::emptyInstance()), - function (Update $update): bool { - $payload = $update->payload; - Assert::assertArrayHasKey('visit', $payload); - Assert::assertArrayNotHasKey('shortUrl', $payload); - Assert::assertIsArray($payload['visit']); - Assert::assertArrayHasKey('visitedUrl', $payload['visit']); - Assert::assertArrayHasKey('type', $payload['visit']); - - return true; + function (ObjectProphecy|PublishingHelperInterface $helper): void { + $helper->publishUpdate(Argument::type(Update::class))->shouldHaveBeenCalledOnce(); + }, + function (ObjectProphecy|PublishingUpdatesGeneratorInterface $updatesGenerator): void { + $update = Update::forTopicAndPayload('', []); + $updatesGenerator->newOrphanVisitUpdate(Argument::cetera())->willReturn($update) + ->shouldBeCalledOnce(); + $updatesGenerator->newVisitUpdate(Argument::cetera())->shouldNotBeCalled(); + $updatesGenerator->newShortUrlVisitUpdate(Argument::cetera())->shouldNotBeCalled(); }, ]; } From 26037327f9466d3d25c3b6af7067ad43f5756d41 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Wed, 27 Jul 2022 18:06:47 +0200 Subject: [PATCH 13/19] Moved duplicated code in short URL listeners to an abstract class --- .../AbstractNotifyNewShortUrlListener.php | 56 +++++++++++++++++++ .../Mercure/NotifyNewShortUrlToMercure.php | 38 +++---------- .../RabbitMq/NotifyNewShortUrlToRabbitMq.php | 41 +++++--------- .../RedisPubSub/NotifyNewShortUrlToRedis.php | 41 +++++--------- .../NotifyNewShortUrlToMercureTest.php | 8 +-- .../NotifyNewShortUrlToRabbitMqTest.php | 8 +-- 6 files changed, 97 insertions(+), 95 deletions(-) create mode 100644 module/Core/src/EventDispatcher/Async/AbstractNotifyNewShortUrlListener.php diff --git a/module/Core/src/EventDispatcher/Async/AbstractNotifyNewShortUrlListener.php b/module/Core/src/EventDispatcher/Async/AbstractNotifyNewShortUrlListener.php new file mode 100644 index 00000000..1da31cd8 --- /dev/null +++ b/module/Core/src/EventDispatcher/Async/AbstractNotifyNewShortUrlListener.php @@ -0,0 +1,56 @@ +isEnabled()) { + return; + } + + $shortUrlId = $shortUrlCreated->shortUrlId; + $shortUrl = $this->em->find(ShortUrl::class, $shortUrlId); + $name = $this->getRemoteSystemName(); + + if ($shortUrl === null) { + $this->logger->warning( + 'Tried to notify {name} for new short URL with id "{shortUrlId}", but it does not exist.', + ['shortUrlId' => $shortUrlId, 'name' => $name], + ); + return; + } + + try { + $this->mercureHelper->publishUpdate($this->updatesGenerator->newShortUrlUpdate($shortUrl)); + } catch (Throwable $e) { + $this->logger->debug( + 'Error while trying to notify {name} with new short URL. {e}', + ['e' => $e, 'name' => $name], + ); + } + } + + abstract protected function isEnabled(): bool; + + abstract protected function getRemoteSystemName(): string; +} diff --git a/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php b/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php index fba3a57d..3790f6ff 100644 --- a/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php +++ b/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php @@ -4,41 +4,17 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\EventDispatcher\Mercure; -use Doctrine\ORM\EntityManagerInterface; -use Psr\Log\LoggerInterface; -use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; -use Shlinkio\Shlink\Core\Entity\ShortUrl; -use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; -use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; -use Throwable; +use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyNewShortUrlListener; -class NotifyNewShortUrlToMercure +class NotifyNewShortUrlToMercure extends AbstractNotifyNewShortUrlListener { - public function __construct( - private readonly PublishingHelperInterface $mercureHelper, - private readonly PublishingUpdatesGeneratorInterface $updatesGenerator, - private readonly EntityManagerInterface $em, - private readonly LoggerInterface $logger, - ) { + protected function isEnabled(): bool + { + return true; } - public function __invoke(ShortUrlCreated $shortUrlCreated): void + protected function getRemoteSystemName(): string { - $shortUrlId = $shortUrlCreated->shortUrlId; - $shortUrl = $this->em->find(ShortUrl::class, $shortUrlId); - - if ($shortUrl === null) { - $this->logger->warning( - 'Tried to notify Mercure for new short URL with id "{shortUrlId}", but it does not exist.', - ['shortUrlId' => $shortUrlId], - ); - return; - } - - try { - $this->mercureHelper->publishUpdate($this->updatesGenerator->newShortUrlUpdate($shortUrl)); - } catch (Throwable $e) { - $this->logger->debug('Error while trying to notify mercure hub with new short URL. {e}', ['e' => $e]); - } + return 'Mercure'; } } diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php index ad562d2a..ac95f3b0 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php @@ -7,44 +7,29 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; -use Shlinkio\Shlink\Core\Entity\ShortUrl; -use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; +use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyNewShortUrlListener; use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; use Shlinkio\Shlink\Core\Options\RabbitMqOptions; -use Throwable; -class NotifyNewShortUrlToRabbitMq +class NotifyNewShortUrlToRabbitMq extends AbstractNotifyNewShortUrlListener { public function __construct( - private readonly PublishingHelperInterface $rabbitMqHelper, - private readonly PublishingUpdatesGeneratorInterface $updatesGenerator, - private readonly EntityManagerInterface $em, - private readonly LoggerInterface $logger, + PublishingHelperInterface $rabbitMqHelper, + PublishingUpdatesGeneratorInterface $updatesGenerator, + EntityManagerInterface $em, + LoggerInterface $logger, private readonly RabbitMqOptions $options, ) { + parent::__construct($rabbitMqHelper, $updatesGenerator, $em, $logger); } - public function __invoke(ShortUrlCreated $shortUrlCreated): void + protected function isEnabled(): bool { - if (! $this->options->isEnabled()) { - return; - } + return $this->options->isEnabled(); + } - $shortUrlId = $shortUrlCreated->shortUrlId; - $shortUrl = $this->em->find(ShortUrl::class, $shortUrlId); - - if ($shortUrl === null) { - $this->logger->warning( - 'Tried to notify RabbitMQ for new short URL with id "{shortUrlId}", but it does not exist.', - ['shortUrlId' => $shortUrlId], - ); - return; - } - - try { - $this->rabbitMqHelper->publishUpdate($this->updatesGenerator->newShortUrlUpdate($shortUrl)); - } catch (Throwable $e) { - $this->logger->debug('Error while trying to notify RabbitMQ with new short URL. {e}', ['e' => $e]); - } + protected function getRemoteSystemName(): string + { + return 'RabbitMQ'; } } diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php index 4a56858e..da871230 100644 --- a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php @@ -7,43 +7,28 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; -use Shlinkio\Shlink\Core\Entity\ShortUrl; -use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; +use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyNewShortUrlListener; use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; -use Throwable; -class NotifyNewShortUrlToRedis +class NotifyNewShortUrlToRedis extends AbstractNotifyNewShortUrlListener { public function __construct( - private readonly PublishingHelperInterface $redisHelper, - private readonly PublishingUpdatesGeneratorInterface $updatesGenerator, - private readonly EntityManagerInterface $em, - private readonly LoggerInterface $logger, + PublishingHelperInterface $redisHelper, + PublishingUpdatesGeneratorInterface $updatesGenerator, + EntityManagerInterface $em, + LoggerInterface $logger, private readonly bool $enabled, ) { + parent::__construct($redisHelper, $updatesGenerator, $em, $logger); } - public function __invoke(ShortUrlCreated $shortUrlCreated): void + protected function isEnabled(): bool { - if (! $this->enabled) { - return; - } + return $this->enabled; + } - $shortUrlId = $shortUrlCreated->shortUrlId; - $shortUrl = $this->em->find(ShortUrl::class, $shortUrlId); - - if ($shortUrl === null) { - $this->logger->warning( - 'Tried to notify Redis pub/sub for new short URL with id "{shortUrlId}", but it does not exist.', - ['shortUrlId' => $shortUrlId], - ); - return; - } - - try { - $this->redisHelper->publishUpdate($this->updatesGenerator->newShortUrlUpdate($shortUrl)); - } catch (Throwable $e) { - $this->logger->debug('Error while trying to notify Redis pub/sub with new short URL. {e}', ['e' => $e]); - } + protected function getRemoteSystemName(): string + { + return 'Redis pub/sub'; } } diff --git a/module/Core/test/EventDispatcher/Mercure/NotifyNewShortUrlToMercureTest.php b/module/Core/test/EventDispatcher/Mercure/NotifyNewShortUrlToMercureTest.php index d0c89b3b..004dfd59 100644 --- a/module/Core/test/EventDispatcher/Mercure/NotifyNewShortUrlToMercureTest.php +++ b/module/Core/test/EventDispatcher/Mercure/NotifyNewShortUrlToMercureTest.php @@ -52,8 +52,8 @@ class NotifyNewShortUrlToMercureTest extends TestCase $find->shouldHaveBeenCalledOnce(); $this->logger->warning( - 'Tried to notify Mercure for new short URL with id "{shortUrlId}", but it does not exist.', - ['shortUrlId' => '123'], + 'Tried to notify {name} for new short URL with id "{shortUrlId}", but it does not exist.', + ['shortUrlId' => '123', 'name' => 'Mercure'], )->shouldHaveBeenCalledOnce(); $this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled(); $this->updatesGenerator->newShortUrlUpdate(Argument::cetera())->shouldNotHaveBeenCalled(); @@ -96,8 +96,8 @@ class NotifyNewShortUrlToMercureTest extends TestCase $publish->shouldHaveBeenCalledOnce(); $this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->debug( - 'Error while trying to notify mercure hub with new short URL. {e}', - ['e' => $e], + 'Error while trying to notify {name} with new short URL. {e}', + ['e' => $e, 'name' => 'Mercure'], )->shouldHaveBeenCalledOnce(); } } diff --git a/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php b/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php index 3bb46286..9cf44977 100644 --- a/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php +++ b/module/Core/test/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMqTest.php @@ -70,8 +70,8 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase $shortUrlId = '123'; $find = $this->em->find(ShortUrl::class, $shortUrlId)->willReturn(null); $logWarning = $this->logger->warning( - 'Tried to notify RabbitMQ for new short URL with id "{shortUrlId}", but it does not exist.', - ['shortUrlId' => $shortUrlId], + 'Tried to notify {name} for new short URL with id "{shortUrlId}", but it does not exist.', + ['shortUrlId' => $shortUrlId, 'name' => 'RabbitMQ'], ); ($this->listener)(new ShortUrlCreated($shortUrlId)); @@ -117,8 +117,8 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase ($this->listener)(new ShortUrlCreated($shortUrlId)); $this->logger->debug( - 'Error while trying to notify RabbitMQ with new short URL. {e}', - ['e' => $e], + 'Error while trying to notify {name} with new short URL. {e}', + ['e' => $e, 'name' => 'RabbitMQ'], )->shouldHaveBeenCalledOnce(); $find->shouldHaveBeenCalledOnce(); $generateUpdate->shouldHaveBeenCalledOnce(); From e36c4d397cf97418bbcca2fa23da66e991d987fa Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Wed, 27 Jul 2022 18:18:36 +0200 Subject: [PATCH 14/19] Moved duplicated code in visit listeners to an abstract class --- .../Async/AbstractAsyncListener.php | 12 ++++ .../AbstractNotifyNewShortUrlListener.php | 10 +-- .../Async/AbstractNotifyVisitListener.php | 72 +++++++++++++++++++ .../Mercure/NotifyVisitToMercure.php | 61 ++-------------- .../RabbitMq/NotifyVisitToRabbitMq.php | 61 ++++++---------- .../RedisPubSub/NotifyVisitToRedis.php | 57 +++------------ .../Mercure/NotifyVisitToMercureTest.php | 7 +- .../RabbitMq/NotifyVisitToRabbitMqTest.php | 8 +-- 8 files changed, 133 insertions(+), 155 deletions(-) create mode 100644 module/Core/src/EventDispatcher/Async/AbstractAsyncListener.php create mode 100644 module/Core/src/EventDispatcher/Async/AbstractNotifyVisitListener.php diff --git a/module/Core/src/EventDispatcher/Async/AbstractAsyncListener.php b/module/Core/src/EventDispatcher/Async/AbstractAsyncListener.php new file mode 100644 index 00000000..e9c78306 --- /dev/null +++ b/module/Core/src/EventDispatcher/Async/AbstractAsyncListener.php @@ -0,0 +1,12 @@ +mercureHelper->publishUpdate($this->updatesGenerator->newShortUrlUpdate($shortUrl)); + $this->publishingHelper->publishUpdate($this->updatesGenerator->newShortUrlUpdate($shortUrl)); } catch (Throwable $e) { $this->logger->debug( 'Error while trying to notify {name} with new short URL. {e}', @@ -49,8 +49,4 @@ abstract class AbstractNotifyNewShortUrlListener ); } } - - abstract protected function isEnabled(): bool; - - abstract protected function getRemoteSystemName(): string; } diff --git a/module/Core/src/EventDispatcher/Async/AbstractNotifyVisitListener.php b/module/Core/src/EventDispatcher/Async/AbstractNotifyVisitListener.php new file mode 100644 index 00000000..a2967e64 --- /dev/null +++ b/module/Core/src/EventDispatcher/Async/AbstractNotifyVisitListener.php @@ -0,0 +1,72 @@ +isEnabled()) { + return; + } + + $visitId = $visitLocated->visitId; + $visit = $this->em->find(Visit::class, $visitId); + $name = $this->getRemoteSystemName(); + + if ($visit === null) { + $this->logger->warning( + 'Tried to notify {name} for visit with id "{visitId}", but it does not exist.', + ['visitId' => $visitId, 'name' => $name], + ); + return; + } + + $updates = $this->determineUpdatesForVisit($visit); + + try { + each($updates, fn (Update $update) => $this->publishingHelper->publishUpdate($update)); + } catch (Throwable $e) { + $this->logger->debug( + 'Error while trying to notify {name} with new visit. {e}', + ['e' => $e, 'name' => $name], + ); + } + } + + /** + * @return Update[] + */ + protected function determineUpdatesForVisit(Visit $visit): array + { + if ($visit->isOrphan()) { + return [$this->updatesGenerator->newOrphanVisitUpdate($visit)]; + } + + return [ + $this->updatesGenerator->newShortUrlVisitUpdate($visit), + $this->updatesGenerator->newVisitUpdate($visit), + ]; + } +} diff --git a/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php b/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php index cd55fcb2..0ccd5fe9 100644 --- a/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php +++ b/module/Core/src/EventDispatcher/Mercure/NotifyVisitToMercure.php @@ -4,64 +4,17 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\EventDispatcher\Mercure; -use Doctrine\ORM\EntityManagerInterface; -use Psr\Log\LoggerInterface; -use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; -use Shlinkio\Shlink\Common\UpdatePublishing\Update; -use Shlinkio\Shlink\Core\Entity\Visit; -use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; -use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; -use Throwable; +use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyVisitListener; -use function Functional\each; - -class NotifyVisitToMercure +class NotifyVisitToMercure extends AbstractNotifyVisitListener { - public function __construct( - private readonly PublishingHelperInterface $mercureHelper, - private readonly PublishingUpdatesGeneratorInterface $updatesGenerator, - private readonly EntityManagerInterface $em, - private readonly LoggerInterface $logger, - ) { + protected function isEnabled(): bool + { + return true; } - public function __invoke(VisitLocated $visitLocated): void + protected function getRemoteSystemName(): string { - $visitId = $visitLocated->visitId; - - /** @var Visit|null $visit */ - $visit = $this->em->find(Visit::class, $visitId); - if ($visit === null) { - $this->logger->warning('Tried to notify mercure for visit with id "{visitId}", but it does not exist.', [ - 'visitId' => $visitId, - ]); - return; - } - - try { - each( - $this->determineUpdatesForVisit($visit), - fn (Update $update) => $this->mercureHelper->publishUpdate($update), - ); - } catch (Throwable $e) { - $this->logger->debug('Error while trying to notify mercure hub with new visit. {e}', [ - 'e' => $e, - ]); - } - } - - /** - * @return Update[] - */ - private function determineUpdatesForVisit(Visit $visit): array - { - if ($visit->isOrphan()) { - return [$this->updatesGenerator->newOrphanVisitUpdate($visit)]; - } - - return [ - $this->updatesGenerator->newShortUrlVisitUpdate($visit), - $this->updatesGenerator->newVisitUpdate($visit), - ]; + return 'Mercure'; } } diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php index 51c9f423..2bf74bb2 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php @@ -10,55 +10,28 @@ use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\Visit; -use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; +use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyVisitListener; use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; use Shlinkio\Shlink\Core\EventDispatcher\Topic; use Shlinkio\Shlink\Core\Options\RabbitMqOptions; -use Throwable; -use function Functional\each; - -class NotifyVisitToRabbitMq +class NotifyVisitToRabbitMq extends AbstractNotifyVisitListener { public function __construct( - private readonly PublishingHelperInterface $rabbitMqHelper, - private readonly PublishingUpdatesGeneratorInterface $updatesGenerator, - private readonly EntityManagerInterface $em, - private readonly LoggerInterface $logger, + PublishingHelperInterface $rabbitMqHelper, + PublishingUpdatesGeneratorInterface $updatesGenerator, + EntityManagerInterface $em, + LoggerInterface $logger, private readonly DataTransformerInterface $orphanVisitTransformer, private readonly RabbitMqOptions $options, ) { - } - - public function __invoke(VisitLocated $visitLocated): void - { - if (! $this->options->isEnabled()) { - return; - } - - $visitId = $visitLocated->visitId; - $visit = $this->em->find(Visit::class, $visitId); - - if ($visit === null) { - $this->logger->warning('Tried to notify RabbitMQ for visit with id "{visitId}", but it does not exist.', [ - 'visitId' => $visitId, - ]); - return; - } - - $updates = $this->determineUpdatesForVisit($visit); - - try { - each($updates, fn (Update $update) => $this->rabbitMqHelper->publishUpdate($update)); - } catch (Throwable $e) { - $this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]); - } + parent::__construct($rabbitMqHelper, $updatesGenerator, $em, $logger); } /** * @return Update[] */ - private function determineUpdatesForVisit(Visit $visit): array + protected function determineUpdatesForVisit(Visit $visit): array { return match (true) { // This was defined incorrectly. @@ -79,12 +52,18 @@ class NotifyVisitToRabbitMq ), ], - // Once the two deprecated cases above have been remove, replace this with a simple "if" and early return. - $visit->isOrphan() => [$this->updatesGenerator->newOrphanVisitUpdate($visit)], - default => [ - $this->updatesGenerator->newShortUrlVisitUpdate($visit), - $this->updatesGenerator->newVisitUpdate($visit), - ], + // Once the two deprecated cases above have been remove, make parent method private + default => parent::determineUpdatesForVisit($visit), }; } + + protected function isEnabled(): bool + { + return $this->options->isEnabled(); + } + + protected function getRemoteSystemName(): string + { + return 'RabbitMQ'; + } } diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php index 820921db..8b54eff0 100644 --- a/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php @@ -7,63 +7,28 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; -use Shlinkio\Shlink\Common\UpdatePublishing\Update; -use Shlinkio\Shlink\Core\Entity\Visit; -use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; +use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyVisitListener; use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; -use Throwable; -use function Functional\each; - -class NotifyVisitToRedis +class NotifyVisitToRedis extends AbstractNotifyVisitListener { public function __construct( - private readonly PublishingHelperInterface $redisHelper, - private readonly PublishingUpdatesGeneratorInterface $updatesGenerator, - private readonly EntityManagerInterface $em, - private readonly LoggerInterface $logger, + PublishingHelperInterface $redisHelper, + PublishingUpdatesGeneratorInterface $updatesGenerator, + EntityManagerInterface $em, + LoggerInterface $logger, private readonly bool $enabled, ) { + parent::__construct($redisHelper, $updatesGenerator, $em, $logger); } - public function __invoke(VisitLocated $visitLocated): void + protected function isEnabled(): bool { - if (! $this->enabled) { - return; - } - - $visitId = $visitLocated->visitId; - $visit = $this->em->find(Visit::class, $visitId); - - if ($visit === null) { - $this->logger->warning( - 'Tried to notify Redis pub/sub for visit with id "{visitId}", but it does not exist.', - ['visitId' => $visitId], - ); - return; - } - - $updates = $this->determineUpdatesForVisit($visit); - - try { - each($updates, fn (Update $update) => $this->redisHelper->publishUpdate($update)); - } catch (Throwable $e) { - $this->logger->debug('Error while trying to notify Redis pub/sub with new visit. {e}', ['e' => $e]); - } + return $this->enabled; } - /** - * @return Update[] - */ - private function determineUpdatesForVisit(Visit $visit): array + protected function getRemoteSystemName(): string { - if ($visit->isOrphan()) { - return [$this->updatesGenerator->newOrphanVisitUpdate($visit)]; - } - - return [ - $this->updatesGenerator->newShortUrlVisitUpdate($visit), - $this->updatesGenerator->newVisitUpdate($visit), - ]; + return 'Redis pub/sub'; } } diff --git a/module/Core/test/EventDispatcher/Mercure/NotifyVisitToMercureTest.php b/module/Core/test/EventDispatcher/Mercure/NotifyVisitToMercureTest.php index 65049f49..1ce29d0d 100644 --- a/module/Core/test/EventDispatcher/Mercure/NotifyVisitToMercureTest.php +++ b/module/Core/test/EventDispatcher/Mercure/NotifyVisitToMercureTest.php @@ -52,8 +52,8 @@ class NotifyVisitToMercureTest extends TestCase $visitId = '123'; $findVisit = $this->em->find(Visit::class, $visitId)->willReturn(null); $logWarning = $this->logger->warning( - 'Tried to notify mercure for visit with id "{visitId}", but it does not exist.', - ['visitId' => $visitId], + 'Tried to notify {name} for visit with id "{visitId}", but it does not exist.', + ['visitId' => $visitId, 'name' => 'Mercure'], ); $logDebug = $this->logger->debug(Argument::cetera()); $buildNewShortUrlVisitUpdate = $this->updatesGenerator->newShortUrlVisitUpdate( @@ -110,8 +110,9 @@ class NotifyVisitToMercureTest extends TestCase $findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit); $logWarning = $this->logger->warning(Argument::cetera()); - $logDebug = $this->logger->debug('Error while trying to notify mercure hub with new visit. {e}', [ + $logDebug = $this->logger->debug('Error while trying to notify {name} with new visit. {e}', [ 'e' => $e, + 'name' => 'Mercure', ]); $buildNewShortUrlVisitUpdate = $this->updatesGenerator->newShortUrlVisitUpdate($visit)->willReturn($update); $buildNewOrphanVisitUpdate = $this->updatesGenerator->newOrphanVisitUpdate($visit)->willReturn($update); diff --git a/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php b/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php index f2e31072..05ee7568 100644 --- a/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php +++ b/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php @@ -79,8 +79,8 @@ class NotifyVisitToRabbitMqTest extends TestCase $visitId = '123'; $findVisit = $this->em->find(Visit::class, $visitId)->willReturn(null); $logWarning = $this->logger->warning( - 'Tried to notify RabbitMQ for visit with id "{visitId}", but it does not exist.', - ['visitId' => $visitId], + 'Tried to notify {name} for visit with id "{visitId}", but it does not exist.', + ['visitId' => $visitId, 'name' => 'RabbitMQ'], ); ($this->listener)(new VisitLocated($visitId)); @@ -147,8 +147,8 @@ class NotifyVisitToRabbitMqTest extends TestCase ($this->listener)(new VisitLocated($visitId)); $this->logger->debug( - 'Error while trying to notify RabbitMQ with new visit. {e}', - ['e' => $e], + 'Error while trying to notify {name} with new visit. {e}', + ['e' => $e, 'name' => 'RabbitMQ'], )->shouldHaveBeenCalledOnce(); $findVisit->shouldHaveBeenCalledOnce(); $generateUpdate->shouldHaveBeenCalledOnce(); From 4cf433a994d36eab5b3caa9dc0bc31e0b856f267 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Thu, 28 Jul 2022 10:25:55 +0200 Subject: [PATCH 15/19] Defined enum with supported remote systems --- .../EventDispatcher/Async/AbstractAsyncListener.php | 2 +- .../Async/AbstractNotifyNewShortUrlListener.php | 2 +- .../Async/AbstractNotifyVisitListener.php | 2 +- .../Core/src/EventDispatcher/Async/RemoteSystem.php | 12 ++++++++++++ .../Mercure/NotifyNewShortUrlToMercure.php | 5 +++-- .../EventDispatcher/Mercure/NotifyVisitToMercure.php | 5 +++-- .../RabbitMq/NotifyNewShortUrlToRabbitMq.php | 5 +++-- .../RabbitMq/NotifyVisitToRabbitMq.php | 5 +++-- .../RedisPubSub/NotifyNewShortUrlToRedis.php | 5 +++-- .../RedisPubSub/NotifyVisitToRedis.php | 5 +++-- 10 files changed, 33 insertions(+), 15 deletions(-) create mode 100644 module/Core/src/EventDispatcher/Async/RemoteSystem.php diff --git a/module/Core/src/EventDispatcher/Async/AbstractAsyncListener.php b/module/Core/src/EventDispatcher/Async/AbstractAsyncListener.php index e9c78306..ae8391db 100644 --- a/module/Core/src/EventDispatcher/Async/AbstractAsyncListener.php +++ b/module/Core/src/EventDispatcher/Async/AbstractAsyncListener.php @@ -8,5 +8,5 @@ abstract class AbstractAsyncListener { abstract protected function isEnabled(): bool; - abstract protected function getRemoteSystemName(): string; + abstract protected function getRemoteSystem(): RemoteSystem; } diff --git a/module/Core/src/EventDispatcher/Async/AbstractNotifyNewShortUrlListener.php b/module/Core/src/EventDispatcher/Async/AbstractNotifyNewShortUrlListener.php index eab93209..ab435577 100644 --- a/module/Core/src/EventDispatcher/Async/AbstractNotifyNewShortUrlListener.php +++ b/module/Core/src/EventDispatcher/Async/AbstractNotifyNewShortUrlListener.php @@ -30,7 +30,7 @@ abstract class AbstractNotifyNewShortUrlListener extends AbstractAsyncListener $shortUrlId = $shortUrlCreated->shortUrlId; $shortUrl = $this->em->find(ShortUrl::class, $shortUrlId); - $name = $this->getRemoteSystemName(); + $name = $this->getRemoteSystem()->value; if ($shortUrl === null) { $this->logger->warning( diff --git a/module/Core/src/EventDispatcher/Async/AbstractNotifyVisitListener.php b/module/Core/src/EventDispatcher/Async/AbstractNotifyVisitListener.php index a2967e64..5852b032 100644 --- a/module/Core/src/EventDispatcher/Async/AbstractNotifyVisitListener.php +++ b/module/Core/src/EventDispatcher/Async/AbstractNotifyVisitListener.php @@ -33,7 +33,7 @@ abstract class AbstractNotifyVisitListener extends AbstractAsyncListener $visitId = $visitLocated->visitId; $visit = $this->em->find(Visit::class, $visitId); - $name = $this->getRemoteSystemName(); + $name = $this->getRemoteSystem()->value; if ($visit === null) { $this->logger->warning( diff --git a/module/Core/src/EventDispatcher/Async/RemoteSystem.php b/module/Core/src/EventDispatcher/Async/RemoteSystem.php new file mode 100644 index 00000000..2cdda1d9 --- /dev/null +++ b/module/Core/src/EventDispatcher/Async/RemoteSystem.php @@ -0,0 +1,12 @@ +options->isEnabled(); } - protected function getRemoteSystemName(): string + protected function getRemoteSystem(): RemoteSystem { - return 'RabbitMQ'; + return RemoteSystem::RABBIT_MQ; } } diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php index 2bf74bb2..fe777c69 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php @@ -11,6 +11,7 @@ use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; use Shlinkio\Shlink\Common\UpdatePublishing\Update; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyVisitListener; +use Shlinkio\Shlink\Core\EventDispatcher\Async\RemoteSystem; use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; use Shlinkio\Shlink\Core\EventDispatcher\Topic; use Shlinkio\Shlink\Core\Options\RabbitMqOptions; @@ -62,8 +63,8 @@ class NotifyVisitToRabbitMq extends AbstractNotifyVisitListener return $this->options->isEnabled(); } - protected function getRemoteSystemName(): string + protected function getRemoteSystem(): RemoteSystem { - return 'RabbitMQ'; + return RemoteSystem::RABBIT_MQ; } } diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php index da871230..5cee9d5e 100644 --- a/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedis.php @@ -8,6 +8,7 @@ use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyNewShortUrlListener; +use Shlinkio\Shlink\Core\EventDispatcher\Async\RemoteSystem; use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; class NotifyNewShortUrlToRedis extends AbstractNotifyNewShortUrlListener @@ -27,8 +28,8 @@ class NotifyNewShortUrlToRedis extends AbstractNotifyNewShortUrlListener return $this->enabled; } - protected function getRemoteSystemName(): string + protected function getRemoteSystem(): RemoteSystem { - return 'Redis pub/sub'; + return RemoteSystem::REDIS_PUB_SUB; } } diff --git a/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php b/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php index 8b54eff0..ae349495 100644 --- a/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php +++ b/module/Core/src/EventDispatcher/RedisPubSub/NotifyVisitToRedis.php @@ -8,6 +8,7 @@ use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface; use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyVisitListener; +use Shlinkio\Shlink\Core\EventDispatcher\Async\RemoteSystem; use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface; class NotifyVisitToRedis extends AbstractNotifyVisitListener @@ -27,8 +28,8 @@ class NotifyVisitToRedis extends AbstractNotifyVisitListener return $this->enabled; } - protected function getRemoteSystemName(): string + protected function getRemoteSystem(): RemoteSystem { - return 'Redis pub/sub'; + return RemoteSystem::REDIS_PUB_SUB; } } From 20a6e7e21010c086108eafd45faed286beb44eda Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Thu, 28 Jul 2022 10:33:26 +0200 Subject: [PATCH 16/19] Created NotifyNewShortUrlToRedisTest --- .../NotifyNewShortUrlToRedisTest.php | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 module/Core/test/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedisTest.php diff --git a/module/Core/test/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedisTest.php b/module/Core/test/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedisTest.php new file mode 100644 index 00000000..d5fa8b8c --- /dev/null +++ b/module/Core/test/EventDispatcher/RedisPubSub/NotifyNewShortUrlToRedisTest.php @@ -0,0 +1,95 @@ +helper = $this->prophesize(PublishingHelperInterface::class); + $this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class); + $this->em = $this->prophesize(EntityManagerInterface::class); + $this->logger = $this->prophesize(LoggerInterface::class); + } + + /** @test */ + public function doesNothingWhenTheFeatureIsNotEnabled(): void + { + $this->createListener(false)(new ShortUrlCreated('123')); + + $this->em->find(Argument::cetera())->shouldNotHaveBeenCalled(); + $this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled(); + $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); + $this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled(); + } + + /** + * @test + * @dataProvider provideExceptions + */ + public function printsDebugMessageInCaseOfError(Throwable $e): void + { + $shortUrlId = '123'; + $update = Update::forTopicAndPayload(Topic::NEW_SHORT_URL->value, []); + $find = $this->em->find(ShortUrl::class, $shortUrlId)->willReturn(ShortUrl::withLongUrl('')); + $generateUpdate = $this->updatesGenerator->newShortUrlUpdate(Argument::type(ShortUrl::class))->willReturn( + $update, + ); + $publish = $this->helper->publishUpdate($update)->willThrow($e); + + $this->createListener()(new ShortUrlCreated($shortUrlId)); + + $this->logger->debug( + 'Error while trying to notify {name} with new short URL. {e}', + ['e' => $e, 'name' => 'Redis pub/sub'], + )->shouldHaveBeenCalledOnce(); + $find->shouldHaveBeenCalledOnce(); + $generateUpdate->shouldHaveBeenCalledOnce(); + $publish->shouldHaveBeenCalledOnce(); + } + + public function provideExceptions(): iterable + { + yield [new RuntimeException('RuntimeException Error')]; + yield [new Exception('Exception Error')]; + yield [new DomainException('DomainException Error')]; + } + + private function createListener(bool $enabled = true): NotifyNewShortUrlToRedis + { + return new NotifyNewShortUrlToRedis( + $this->helper->reveal(), + $this->updatesGenerator->reveal(), + $this->em->reveal(), + $this->logger->reveal(), + $enabled, + ); + } +} From 73ae754aa76b973336dcffe4727c8cf367f42399 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Thu, 28 Jul 2022 10:36:52 +0200 Subject: [PATCH 17/19] Created NotifyVisitToRedisTest --- .../RedisPubSub/NotifyVisitToRedisTest.php | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 module/Core/test/EventDispatcher/RedisPubSub/NotifyVisitToRedisTest.php diff --git a/module/Core/test/EventDispatcher/RedisPubSub/NotifyVisitToRedisTest.php b/module/Core/test/EventDispatcher/RedisPubSub/NotifyVisitToRedisTest.php new file mode 100644 index 00000000..3beaa838 --- /dev/null +++ b/module/Core/test/EventDispatcher/RedisPubSub/NotifyVisitToRedisTest.php @@ -0,0 +1,94 @@ +helper = $this->prophesize(PublishingHelperInterface::class); + $this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class); + $this->em = $this->prophesize(EntityManagerInterface::class); + $this->logger = $this->prophesize(LoggerInterface::class); + } + + /** @test */ + public function doesNothingWhenTheFeatureIsNotEnabled(): void + { + $this->createListener(false)(new VisitLocated('123')); + + $this->em->find(Argument::cetera())->shouldNotHaveBeenCalled(); + $this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled(); + $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); + $this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled(); + } + + /** + * @test + * @dataProvider provideExceptions + */ + public function printsDebugMessageInCaseOfError(Throwable $e): void + { + $visitId = '123'; + $findVisit = $this->em->find(Visit::class, $visitId)->willReturn(Visit::forBasePath(Visitor::emptyInstance())); + $generateUpdate = $this->updatesGenerator->newOrphanVisitUpdate(Argument::type(Visit::class))->willReturn( + Update::forTopicAndPayload('', []), + ); + $publish = $this->helper->publishUpdate(Argument::cetera())->willThrow($e); + + $this->createListener()(new VisitLocated($visitId)); + + $this->logger->debug( + 'Error while trying to notify {name} with new visit. {e}', + ['e' => $e, 'name' => 'Redis pub/sub'], + )->shouldHaveBeenCalledOnce(); + $findVisit->shouldHaveBeenCalledOnce(); + $generateUpdate->shouldHaveBeenCalledOnce(); + $publish->shouldHaveBeenCalledOnce(); + } + + public function provideExceptions(): iterable + { + yield [new RuntimeException('RuntimeException Error')]; + yield [new Exception('Exception Error')]; + yield [new DomainException('DomainException Error')]; + } + + private function createListener(bool $enabled = true): NotifyVisitToRedis + { + return new NotifyVisitToRedis( + $this->helper->reveal(), + $this->updatesGenerator->reveal(), + $this->em->reveal(), + $this->logger->reveal(), + $enabled, + ); + } +} From 3289968a933475be812486cc0d5e26751be6e286 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Thu, 28 Jul 2022 10:46:24 +0200 Subject: [PATCH 18/19] Updated changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bd8a5e57..27baca9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com), and this You can now subscribe to the `https://shlink.io/new-short-url` topic on any of the supported async updates technologies in order to get notified when a short URL is created. +* [#1367](https://github.com/shlinkio/shlink/issues/1367) Added support to publish real-time updates in redis pub/sub. + + The publishing will happen in the same redis instance/cluster configured for caching. + ### Changed * [#1452](https://github.com/shlinkio/shlink/issues/1452) Updated to monolog 3 * [#1485](https://github.com/shlinkio/shlink/issues/1485) Changed payload published in RabbitMQ for all visits events, in order to conform with the Async API spec. From 8c2bdfba1c0f81e03527e4a695106688efb1ae2a Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Thu, 28 Jul 2022 10:51:48 +0200 Subject: [PATCH 19/19] Refactored match to ifs with eary returns --- .../RabbitMq/NotifyVisitToRabbitMq.php | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php index fe777c69..0faa795c 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php @@ -34,28 +34,29 @@ class NotifyVisitToRabbitMq extends AbstractNotifyVisitListener */ protected function determineUpdatesForVisit(Visit $visit): array { - return match (true) { - // This was defined incorrectly. - // According to the spec, both the visit and the short URL it belongs to, should be published. - // The shape should be ['visit' => [...], 'shortUrl' => ?[...]] - // However, this would be a breaking change, so we need a flag that determines the shape of the payload. - $this->options->legacyVisitsPublishing() && $visit->isOrphan() => [ + // Once the two deprecated cases below have been removed, make parent method private + if (! $this->options->legacyVisitsPublishing()) { + return parent::determineUpdatesForVisit($visit); + } + + // This was defined incorrectly. + // According to the spec, both the visit and the short URL it belongs to, should be published. + // The shape should be ['visit' => [...], 'shortUrl' => ?[...]] + // However, this would be a breaking change, so we need a flag that determines the shape of the payload. + return $visit->isOrphan() + ? [ Update::forTopicAndPayload( Topic::NEW_ORPHAN_VISIT->value, $this->orphanVisitTransformer->transform($visit), ), - ], - $this->options->legacyVisitsPublishing() && ! $visit->isOrphan() => [ + ] + : [ Update::forTopicAndPayload(Topic::NEW_VISIT->value, $visit->jsonSerialize()), Update::forTopicAndPayload( Topic::newShortUrlVisit($visit->getShortUrl()?->getShortCode()), $visit->jsonSerialize(), ), - ], - - // Once the two deprecated cases above have been remove, make parent method private - default => parent::determineUpdatesForVisit($visit), - }; + ]; } protected function isEnabled(): bool