mirror of
https://github.com/shlinkio/shlink.git
synced 2026-02-28 12:13:13 +08:00
Merge pull request #1488 from acelaya-forks/feature/redis-pub-sub
Feature/redis pub sub
This commit is contained in:
@@ -14,6 +14,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com), and this
|
||||
|
||||
You can now subscribe to the `https://shlink.io/new-short-url` topic on any of the supported async updates technologies in order to get notified when a short URL is created.
|
||||
|
||||
* [#1367](https://github.com/shlinkio/shlink/issues/1367) Added support to publish real-time updates in redis pub/sub.
|
||||
|
||||
The publishing will happen in the same redis instance/cluster configured for caching.
|
||||
|
||||
### Changed
|
||||
* [#1452](https://github.com/shlinkio/shlink/issues/1452) Updated to monolog 3
|
||||
* [#1485](https://github.com/shlinkio/shlink/issues/1485) Changed payload published in RabbitMQ for all visits events, in order to conform with the Async API spec.
|
||||
|
||||
@@ -36,26 +36,24 @@
|
||||
"mezzio/mezzio": "^3.7",
|
||||
"mezzio/mezzio-fastroute": "^3.3",
|
||||
"mezzio/mezzio-problem-details": "^1.5",
|
||||
"mezzio/mezzio-swoole": "^4.0",
|
||||
"mezzio/mezzio-swoole": "^4.3",
|
||||
"mlocati/ip-lib": "^1.17",
|
||||
"ocramius/proxy-manager": "^2.11",
|
||||
"pagerfanta/core": "^3.5",
|
||||
"php-middleware/request-id": "^4.1",
|
||||
"predis/predis": "^1.1",
|
||||
"pugx/shortid-php": "^1.0",
|
||||
"ramsey/uuid": "^4.2",
|
||||
"shlinkio/shlink-common": "dev-main#0396706 as 4.5",
|
||||
"shlinkio/shlink-common": "dev-main#b3848ad as 4.5",
|
||||
"shlinkio/shlink-config": "^1.6",
|
||||
"shlinkio/shlink-event-dispatcher": "^2.4",
|
||||
"shlinkio/shlink-importer": "^3.0",
|
||||
"shlinkio/shlink-installer": "^7.1",
|
||||
"shlinkio/shlink-installer": "dev-develop#f76e9aa as 7.2",
|
||||
"shlinkio/shlink-ip-geolocation": "^2.2",
|
||||
"symfony/console": "^6.0",
|
||||
"symfony/filesystem": "^6.0",
|
||||
"symfony/lock": "^6.0",
|
||||
"symfony/mercure": "^0.6",
|
||||
"symfony/process": "^6.0",
|
||||
"symfony/string": "^6.0"
|
||||
"symfony/console": "^6.1",
|
||||
"symfony/filesystem": "^6.1",
|
||||
"symfony/lock": "^6.1",
|
||||
"symfony/process": "^6.1",
|
||||
"symfony/string": "^6.1"
|
||||
},
|
||||
"require-dev": {
|
||||
"cebe/php-openapi": "^1.7",
|
||||
@@ -72,7 +70,7 @@
|
||||
"roave/security-advisories": "dev-master",
|
||||
"shlinkio/php-coding-standard": "~2.3.0",
|
||||
"shlinkio/shlink-test-utils": "^3.0.1",
|
||||
"symfony/var-dumper": "^6.0",
|
||||
"symfony/var-dumper": "^6.1",
|
||||
"veewee/composer-run-parallel": "^1.1"
|
||||
},
|
||||
"autoload": {
|
||||
|
||||
@@ -32,6 +32,7 @@ return [
|
||||
Option\Worker\WebWorkerNumConfigOption::class,
|
||||
Option\Redis\RedisServersConfigOption::class,
|
||||
Option\Redis\RedisSentinelServiceConfigOption::class,
|
||||
Option\Redis\RedisPubSubConfigOption::class,
|
||||
Option\UrlShortener\ShortCodeLengthOption::class,
|
||||
Option\Mercure\EnableMercureConfigOption::class,
|
||||
Option\Mercure\MercurePublicUrlConfigOption::class,
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
declare(strict_types=1);
|
||||
|
||||
use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory;
|
||||
use Predis\ClientInterface as PredisClient;
|
||||
use Shlinkio\Shlink\Common\Cache\RedisFactory;
|
||||
use Shlinkio\Shlink\Common\Logger\LoggerAwareDelegatorFactory;
|
||||
use Shlinkio\Shlink\Core\Config\EnvVars;
|
||||
use Symfony\Component\Lock;
|
||||
@@ -38,7 +38,7 @@ return [
|
||||
|
||||
ConfigAbstractFactory::class => [
|
||||
Lock\Store\FlockStore::class => ['config.locks.locks_dir'],
|
||||
Lock\Store\RedisStore::class => [PredisClient::class],
|
||||
Lock\Store\RedisStore::class => [RedisFactory::SERVICE_NAME],
|
||||
Lock\LockFactory::class => ['lock_store'],
|
||||
LOCAL_LOCK_FACTORY => ['local_lock_store'],
|
||||
],
|
||||
|
||||
@@ -6,9 +6,14 @@ use Shlinkio\Shlink\Core\Config\EnvVars;
|
||||
|
||||
return (static function (): array {
|
||||
$redisServers = EnvVars::REDIS_SERVERS->loadFromEnv();
|
||||
$pubSub = [
|
||||
'redis' => [
|
||||
'pub_sub_enabled' => $redisServers !== null && EnvVars::REDIS_PUB_SUB_ENABLED->loadFromEnv(false),
|
||||
],
|
||||
];
|
||||
|
||||
return match ($redisServers) {
|
||||
null => [],
|
||||
null => $pubSub,
|
||||
default => [
|
||||
'cache' => [
|
||||
'redis' => [
|
||||
@@ -16,6 +21,7 @@ return (static function (): array {
|
||||
'sentinel_service' => EnvVars::REDIS_SENTINEL_SERVICE->loadFromEnv(),
|
||||
],
|
||||
],
|
||||
...$pubSub,
|
||||
],
|
||||
};
|
||||
})();
|
||||
|
||||
@@ -7,12 +7,13 @@ return [
|
||||
'cache' => [
|
||||
'redis' => [
|
||||
'servers' => 'tcp://shlink_redis:6379',
|
||||
// 'servers' => [
|
||||
// 'tcp://shlink_redis:6379',
|
||||
// ],
|
||||
],
|
||||
],
|
||||
|
||||
'redis' => [
|
||||
'pub_sub_enabled' => true,
|
||||
],
|
||||
|
||||
'dependencies' => [
|
||||
'aliases' => [
|
||||
// With this config, a user could alias 'lock_store' => 'redis_lock_store' to override the default
|
||||
|
||||
@@ -64,7 +64,7 @@ return [
|
||||
ShortUrl\Transformer\ShortUrlDataTransformer::class => ConfigAbstractFactory::class,
|
||||
ShortUrl\Middleware\ExtraPathRedirectMiddleware::class => ConfigAbstractFactory::class,
|
||||
|
||||
Mercure\MercureUpdatesGenerator::class => ConfigAbstractFactory::class,
|
||||
EventDispatcher\PublishingUpdatesGenerator::class => ConfigAbstractFactory::class,
|
||||
|
||||
Importer\ImportedLinksProcessor::class => ConfigAbstractFactory::class,
|
||||
|
||||
@@ -160,7 +160,7 @@ return [
|
||||
Options\UrlShortenerOptions::class,
|
||||
],
|
||||
|
||||
Mercure\MercureUpdatesGenerator::class => [
|
||||
EventDispatcher\PublishingUpdatesGenerator::class => [
|
||||
ShortUrl\Transformer\ShortUrlDataTransformer::class,
|
||||
Visit\Transformer\OrphanVisitDataTransformer::class,
|
||||
],
|
||||
|
||||
@@ -7,10 +7,11 @@ namespace Shlinkio\Shlink\Core;
|
||||
use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory;
|
||||
use Psr\EventDispatcher\EventDispatcherInterface;
|
||||
use Shlinkio\Shlink\CLI\Util\GeolocationDbUpdater;
|
||||
use Shlinkio\Shlink\Common\Cache\RedisPublishingHelper;
|
||||
use Shlinkio\Shlink\Common\Mercure\MercureHubPublishingHelper;
|
||||
use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelper;
|
||||
use Shlinkio\Shlink\IpGeolocation\GeoLite2\DbUpdater;
|
||||
use Shlinkio\Shlink\IpGeolocation\Resolver\IpLocationResolverInterface;
|
||||
use Symfony\Component\Mercure\Hub;
|
||||
|
||||
return [
|
||||
|
||||
@@ -24,12 +25,14 @@ return [
|
||||
EventDispatcher\Event\VisitLocated::class => [
|
||||
EventDispatcher\Mercure\NotifyVisitToMercure::class,
|
||||
EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class,
|
||||
EventDispatcher\RedisPubSub\NotifyVisitToRedis::class,
|
||||
EventDispatcher\NotifyVisitToWebHooks::class,
|
||||
EventDispatcher\UpdateGeoLiteDb::class,
|
||||
],
|
||||
EventDispatcher\Event\ShortUrlCreated::class => [
|
||||
EventDispatcher\Mercure\NotifyNewShortUrlToMercure::class,
|
||||
EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class,
|
||||
EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class,
|
||||
],
|
||||
],
|
||||
],
|
||||
@@ -42,6 +45,8 @@ return [
|
||||
EventDispatcher\Mercure\NotifyNewShortUrlToMercure::class => ConfigAbstractFactory::class,
|
||||
EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class => ConfigAbstractFactory::class,
|
||||
EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => ConfigAbstractFactory::class,
|
||||
EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => ConfigAbstractFactory::class,
|
||||
EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => ConfigAbstractFactory::class,
|
||||
EventDispatcher\UpdateGeoLiteDb::class => ConfigAbstractFactory::class,
|
||||
],
|
||||
|
||||
@@ -58,6 +63,12 @@ return [
|
||||
EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => [
|
||||
EventDispatcher\CloseDbConnectionEventListenerDelegator::class,
|
||||
],
|
||||
EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [
|
||||
EventDispatcher\CloseDbConnectionEventListenerDelegator::class,
|
||||
],
|
||||
EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [
|
||||
EventDispatcher\CloseDbConnectionEventListenerDelegator::class,
|
||||
],
|
||||
EventDispatcher\NotifyVisitToWebHooks::class => [
|
||||
EventDispatcher\CloseDbConnectionEventListenerDelegator::class,
|
||||
],
|
||||
@@ -81,32 +92,46 @@ return [
|
||||
Options\AppOptions::class,
|
||||
],
|
||||
EventDispatcher\Mercure\NotifyVisitToMercure::class => [
|
||||
Hub::class,
|
||||
Mercure\MercureUpdatesGenerator::class,
|
||||
MercureHubPublishingHelper::class,
|
||||
EventDispatcher\PublishingUpdatesGenerator::class,
|
||||
'em',
|
||||
'Logger_Shlink',
|
||||
],
|
||||
EventDispatcher\Mercure\NotifyNewShortUrlToMercure::class => [
|
||||
Hub::class,
|
||||
Mercure\MercureUpdatesGenerator::class,
|
||||
MercureHubPublishingHelper::class,
|
||||
EventDispatcher\PublishingUpdatesGenerator::class,
|
||||
'em',
|
||||
'Logger_Shlink',
|
||||
],
|
||||
EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class => [
|
||||
RabbitMqPublishingHelper::class,
|
||||
EventDispatcher\PublishingUpdatesGenerator::class,
|
||||
'em',
|
||||
'Logger_Shlink',
|
||||
Visit\Transformer\OrphanVisitDataTransformer::class,
|
||||
ShortUrl\Transformer\ShortUrlDataTransformer::class,
|
||||
Options\RabbitMqOptions::class,
|
||||
],
|
||||
EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => [
|
||||
RabbitMqPublishingHelper::class,
|
||||
EventDispatcher\PublishingUpdatesGenerator::class,
|
||||
'em',
|
||||
'Logger_Shlink',
|
||||
ShortUrl\Transformer\ShortUrlDataTransformer::class,
|
||||
Options\RabbitMqOptions::class,
|
||||
],
|
||||
EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [
|
||||
RedisPublishingHelper::class,
|
||||
EventDispatcher\PublishingUpdatesGenerator::class,
|
||||
'em',
|
||||
'Logger_Shlink',
|
||||
'config.redis.pub_sub_enabled',
|
||||
],
|
||||
EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [
|
||||
RedisPublishingHelper::class,
|
||||
EventDispatcher\PublishingUpdatesGenerator::class,
|
||||
'em',
|
||||
'Logger_Shlink',
|
||||
'config.redis.pub_sub_enabled',
|
||||
],
|
||||
EventDispatcher\UpdateGeoLiteDb::class => [GeolocationDbUpdater::class, 'Logger_Shlink'],
|
||||
],
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ enum EnvVars: string
|
||||
case GEOLITE_LICENSE_KEY = 'GEOLITE_LICENSE_KEY';
|
||||
case REDIS_SERVERS = 'REDIS_SERVERS';
|
||||
case REDIS_SENTINEL_SERVICE = 'REDIS_SENTINEL_SERVICE';
|
||||
case REDIS_PUB_SUB_ENABLED = 'REDIS_PUB_SUB_ENABLED';
|
||||
case MERCURE_PUBLIC_HUB_URL = 'MERCURE_PUBLIC_HUB_URL';
|
||||
case MERCURE_INTERNAL_HUB_URL = 'MERCURE_INTERNAL_HUB_URL';
|
||||
case MERCURE_JWT_SECRET = 'MERCURE_JWT_SECRET';
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Shlinkio\Shlink\Core\EventDispatcher\Async;
|
||||
|
||||
abstract class AbstractAsyncListener
|
||||
{
|
||||
abstract protected function isEnabled(): bool;
|
||||
|
||||
abstract protected function getRemoteSystem(): RemoteSystem;
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Shlinkio\Shlink\Core\EventDispatcher\Async;
|
||||
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Core\Entity\ShortUrl;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
|
||||
use Throwable;
|
||||
|
||||
abstract class AbstractNotifyNewShortUrlListener extends AbstractAsyncListener
|
||||
{
|
||||
public function __construct(
|
||||
private readonly PublishingHelperInterface $publishingHelper,
|
||||
private readonly PublishingUpdatesGeneratorInterface $updatesGenerator,
|
||||
private readonly EntityManagerInterface $em,
|
||||
private readonly LoggerInterface $logger,
|
||||
) {
|
||||
}
|
||||
|
||||
public function __invoke(ShortUrlCreated $shortUrlCreated): void
|
||||
{
|
||||
if (! $this->isEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$shortUrlId = $shortUrlCreated->shortUrlId;
|
||||
$shortUrl = $this->em->find(ShortUrl::class, $shortUrlId);
|
||||
$name = $this->getRemoteSystem()->value;
|
||||
|
||||
if ($shortUrl === null) {
|
||||
$this->logger->warning(
|
||||
'Tried to notify {name} for new short URL with id "{shortUrlId}", but it does not exist.',
|
||||
['shortUrlId' => $shortUrlId, 'name' => $name],
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$this->publishingHelper->publishUpdate($this->updatesGenerator->newShortUrlUpdate($shortUrl));
|
||||
} catch (Throwable $e) {
|
||||
$this->logger->debug(
|
||||
'Error while trying to notify {name} with new short URL. {e}',
|
||||
['e' => $e, 'name' => $name],
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Shlinkio\Shlink\Core\EventDispatcher\Async;
|
||||
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
|
||||
use Shlinkio\Shlink\Core\Entity\Visit;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
|
||||
use Throwable;
|
||||
|
||||
use function Functional\each;
|
||||
|
||||
abstract class AbstractNotifyVisitListener extends AbstractAsyncListener
|
||||
{
|
||||
public function __construct(
|
||||
private readonly PublishingHelperInterface $publishingHelper,
|
||||
private readonly PublishingUpdatesGeneratorInterface $updatesGenerator,
|
||||
private readonly EntityManagerInterface $em,
|
||||
private readonly LoggerInterface $logger,
|
||||
) {
|
||||
}
|
||||
|
||||
public function __invoke(VisitLocated $visitLocated): void
|
||||
{
|
||||
if (! $this->isEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$visitId = $visitLocated->visitId;
|
||||
$visit = $this->em->find(Visit::class, $visitId);
|
||||
$name = $this->getRemoteSystem()->value;
|
||||
|
||||
if ($visit === null) {
|
||||
$this->logger->warning(
|
||||
'Tried to notify {name} for visit with id "{visitId}", but it does not exist.',
|
||||
['visitId' => $visitId, 'name' => $name],
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
$updates = $this->determineUpdatesForVisit($visit);
|
||||
|
||||
try {
|
||||
each($updates, fn (Update $update) => $this->publishingHelper->publishUpdate($update));
|
||||
} catch (Throwable $e) {
|
||||
$this->logger->debug(
|
||||
'Error while trying to notify {name} with new visit. {e}',
|
||||
['e' => $e, 'name' => $name],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Update[]
|
||||
*/
|
||||
protected function determineUpdatesForVisit(Visit $visit): array
|
||||
{
|
||||
if ($visit->isOrphan()) {
|
||||
return [$this->updatesGenerator->newOrphanVisitUpdate($visit)];
|
||||
}
|
||||
|
||||
return [
|
||||
$this->updatesGenerator->newShortUrlVisitUpdate($visit),
|
||||
$this->updatesGenerator->newVisitUpdate($visit),
|
||||
];
|
||||
}
|
||||
}
|
||||
12
module/Core/src/EventDispatcher/Async/RemoteSystem.php
Normal file
12
module/Core/src/EventDispatcher/Async/RemoteSystem.php
Normal file
@@ -0,0 +1,12 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Shlinkio\Shlink\Core\EventDispatcher\Async;
|
||||
|
||||
enum RemoteSystem: string
|
||||
{
|
||||
case MERCURE = 'Mercure';
|
||||
case RABBIT_MQ = 'RabbitMQ';
|
||||
case REDIS_PUB_SUB = 'Redis pub/sub';
|
||||
}
|
||||
@@ -4,41 +4,18 @@ declare(strict_types=1);
|
||||
|
||||
namespace Shlinkio\Shlink\Core\EventDispatcher\Mercure;
|
||||
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Shlinkio\Shlink\Core\Entity\ShortUrl;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated;
|
||||
use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGeneratorInterface;
|
||||
use Symfony\Component\Mercure\HubInterface;
|
||||
use Throwable;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyNewShortUrlListener;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Async\RemoteSystem;
|
||||
|
||||
class NotifyNewShortUrlToMercure
|
||||
class NotifyNewShortUrlToMercure extends AbstractNotifyNewShortUrlListener
|
||||
{
|
||||
public function __construct(
|
||||
private readonly HubInterface $hub,
|
||||
private readonly MercureUpdatesGeneratorInterface $updatesGenerator,
|
||||
private readonly EntityManagerInterface $em,
|
||||
private readonly LoggerInterface $logger,
|
||||
) {
|
||||
protected function isEnabled(): bool
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
public function __invoke(ShortUrlCreated $shortUrlCreated): void
|
||||
protected function getRemoteSystem(): RemoteSystem
|
||||
{
|
||||
$shortUrlId = $shortUrlCreated->shortUrlId;
|
||||
$shortUrl = $this->em->find(ShortUrl::class, $shortUrlId);
|
||||
|
||||
if ($shortUrl === null) {
|
||||
$this->logger->warning(
|
||||
'Tried to notify Mercure for new short URL with id "{shortUrlId}", but it does not exist.',
|
||||
['shortUrlId' => $shortUrlId],
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$this->hub->publish($this->updatesGenerator->newShortUrlUpdate($shortUrl));
|
||||
} catch (Throwable $e) {
|
||||
$this->logger->debug('Error while trying to notify mercure hub with new short URL. {e}', ['e' => $e]);
|
||||
}
|
||||
return RemoteSystem::MERCURE;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,61 +4,18 @@ declare(strict_types=1);
|
||||
|
||||
namespace Shlinkio\Shlink\Core\EventDispatcher\Mercure;
|
||||
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Shlinkio\Shlink\Core\Entity\Visit;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
|
||||
use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGeneratorInterface;
|
||||
use Symfony\Component\Mercure\HubInterface;
|
||||
use Symfony\Component\Mercure\Update;
|
||||
use Throwable;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyVisitListener;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Async\RemoteSystem;
|
||||
|
||||
use function Functional\each;
|
||||
|
||||
class NotifyVisitToMercure
|
||||
class NotifyVisitToMercure extends AbstractNotifyVisitListener
|
||||
{
|
||||
public function __construct(
|
||||
private readonly HubInterface $hub,
|
||||
private readonly MercureUpdatesGeneratorInterface $updatesGenerator,
|
||||
private readonly EntityManagerInterface $em,
|
||||
private readonly LoggerInterface $logger,
|
||||
) {
|
||||
protected function isEnabled(): bool
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
public function __invoke(VisitLocated $shortUrlLocated): void
|
||||
protected function getRemoteSystem(): RemoteSystem
|
||||
{
|
||||
$visitId = $shortUrlLocated->visitId;
|
||||
|
||||
/** @var Visit|null $visit */
|
||||
$visit = $this->em->find(Visit::class, $visitId);
|
||||
if ($visit === null) {
|
||||
$this->logger->warning('Tried to notify mercure for visit with id "{visitId}", but it does not exist.', [
|
||||
'visitId' => $visitId,
|
||||
]);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
each($this->determineUpdatesForVisit($visit), fn (Update $update) => $this->hub->publish($update));
|
||||
} catch (Throwable $e) {
|
||||
$this->logger->debug('Error while trying to notify mercure hub with new visit. {e}', [
|
||||
'e' => $e,
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Update[]
|
||||
*/
|
||||
private function determineUpdatesForVisit(Visit $visit): array
|
||||
{
|
||||
if ($visit->isOrphan()) {
|
||||
return [$this->updatesGenerator->newOrphanVisitUpdate($visit)];
|
||||
}
|
||||
|
||||
return [
|
||||
$this->updatesGenerator->newShortUrlVisitUpdate($visit),
|
||||
$this->updatesGenerator->newVisitUpdate($visit),
|
||||
];
|
||||
return RemoteSystem::MERCURE;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,17 +2,14 @@
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Shlinkio\Shlink\Core\Mercure;
|
||||
namespace Shlinkio\Shlink\Core\EventDispatcher;
|
||||
|
||||
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
|
||||
use Shlinkio\Shlink\Core\Entity\ShortUrl;
|
||||
use Shlinkio\Shlink\Core\Entity\Visit;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
|
||||
use Symfony\Component\Mercure\Update;
|
||||
|
||||
use function Shlinkio\Shlink\Common\json_encode;
|
||||
|
||||
final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface
|
||||
final class PublishingUpdatesGenerator implements PublishingUpdatesGeneratorInterface
|
||||
{
|
||||
public function __construct(
|
||||
private readonly DataTransformerInterface $shortUrlTransformer,
|
||||
@@ -22,17 +19,17 @@ final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface
|
||||
|
||||
public function newVisitUpdate(Visit $visit): Update
|
||||
{
|
||||
return new Update(Topic::NEW_VISIT->value, json_encode([
|
||||
return Update::forTopicAndPayload(Topic::NEW_VISIT->value, [
|
||||
'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()),
|
||||
'visit' => $visit,
|
||||
]));
|
||||
'visit' => $visit->jsonSerialize(),
|
||||
]);
|
||||
}
|
||||
|
||||
public function newOrphanVisitUpdate(Visit $visit): Update
|
||||
{
|
||||
return new Update(Topic::NEW_ORPHAN_VISIT->value, json_encode([
|
||||
return Update::forTopicAndPayload(Topic::NEW_ORPHAN_VISIT->value, [
|
||||
'visit' => $this->orphanVisitTransformer->transform($visit),
|
||||
]));
|
||||
]);
|
||||
}
|
||||
|
||||
public function newShortUrlVisitUpdate(Visit $visit): Update
|
||||
@@ -40,16 +37,16 @@ final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface
|
||||
$shortUrl = $visit->getShortUrl();
|
||||
$topic = Topic::newShortUrlVisit($shortUrl?->getShortCode());
|
||||
|
||||
return new Update($topic, json_encode([
|
||||
return Update::forTopicAndPayload($topic, [
|
||||
'shortUrl' => $this->shortUrlTransformer->transform($shortUrl),
|
||||
'visit' => $visit,
|
||||
]));
|
||||
'visit' => $visit->jsonSerialize(),
|
||||
]);
|
||||
}
|
||||
|
||||
public function newShortUrlUpdate(ShortUrl $shortUrl): Update
|
||||
{
|
||||
return new Update(Topic::NEW_SHORT_URL->value, json_encode([
|
||||
return Update::forTopicAndPayload(Topic::NEW_SHORT_URL->value, [
|
||||
'shortUrl' => $this->shortUrlTransformer->transform($shortUrl),
|
||||
]));
|
||||
]);
|
||||
}
|
||||
}
|
||||
@@ -2,13 +2,13 @@
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Shlinkio\Shlink\Core\Mercure;
|
||||
namespace Shlinkio\Shlink\Core\EventDispatcher;
|
||||
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
|
||||
use Shlinkio\Shlink\Core\Entity\ShortUrl;
|
||||
use Shlinkio\Shlink\Core\Entity\Visit;
|
||||
use Symfony\Component\Mercure\Update;
|
||||
|
||||
interface MercureUpdatesGeneratorInterface
|
||||
interface PublishingUpdatesGeneratorInterface
|
||||
{
|
||||
public function newVisitUpdate(Visit $visit): Update;
|
||||
|
||||
@@ -6,49 +6,31 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq;
|
||||
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
|
||||
use Shlinkio\Shlink\Core\Entity\ShortUrl;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyNewShortUrlListener;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Async\RemoteSystem;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
|
||||
use Shlinkio\Shlink\Core\Options\RabbitMqOptions;
|
||||
use Throwable;
|
||||
|
||||
class NotifyNewShortUrlToRabbitMq
|
||||
class NotifyNewShortUrlToRabbitMq extends AbstractNotifyNewShortUrlListener
|
||||
{
|
||||
public function __construct(
|
||||
private readonly RabbitMqPublishingHelperInterface $rabbitMqHelper,
|
||||
private readonly EntityManagerInterface $em,
|
||||
private readonly LoggerInterface $logger,
|
||||
private readonly DataTransformerInterface $shortUrlTransformer,
|
||||
PublishingHelperInterface $rabbitMqHelper,
|
||||
PublishingUpdatesGeneratorInterface $updatesGenerator,
|
||||
EntityManagerInterface $em,
|
||||
LoggerInterface $logger,
|
||||
private readonly RabbitMqOptions $options,
|
||||
) {
|
||||
parent::__construct($rabbitMqHelper, $updatesGenerator, $em, $logger);
|
||||
}
|
||||
|
||||
public function __invoke(ShortUrlCreated $shortUrlCreated): void
|
||||
protected function isEnabled(): bool
|
||||
{
|
||||
if (! $this->options->isEnabled()) {
|
||||
return;
|
||||
}
|
||||
return $this->options->isEnabled();
|
||||
}
|
||||
|
||||
$shortUrlId = $shortUrlCreated->shortUrlId;
|
||||
$shortUrl = $this->em->find(ShortUrl::class, $shortUrlId);
|
||||
|
||||
if ($shortUrl === null) {
|
||||
$this->logger->warning(
|
||||
'Tried to notify RabbitMQ for new short URL with id "{shortUrlId}", but it does not exist.',
|
||||
['shortUrlId' => $shortUrlId],
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$this->rabbitMqHelper->publishPayloadInQueue(
|
||||
['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)],
|
||||
Topic::NEW_SHORT_URL->value,
|
||||
);
|
||||
} catch (Throwable $e) {
|
||||
$this->logger->debug('Error while trying to notify RabbitMQ with new short URL. {e}', ['e' => $e]);
|
||||
}
|
||||
protected function getRemoteSystem(): RemoteSystem
|
||||
{
|
||||
return RemoteSystem::RABBIT_MQ;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,86 +6,66 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq;
|
||||
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
|
||||
use Shlinkio\Shlink\Core\Entity\Visit;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyVisitListener;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Async\RemoteSystem;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
|
||||
use Shlinkio\Shlink\Core\Options\RabbitMqOptions;
|
||||
use Throwable;
|
||||
|
||||
class NotifyVisitToRabbitMq
|
||||
class NotifyVisitToRabbitMq extends AbstractNotifyVisitListener
|
||||
{
|
||||
public function __construct(
|
||||
private readonly RabbitMqPublishingHelperInterface $rabbitMqHelper,
|
||||
private readonly EntityManagerInterface $em,
|
||||
private readonly LoggerInterface $logger,
|
||||
PublishingHelperInterface $rabbitMqHelper,
|
||||
PublishingUpdatesGeneratorInterface $updatesGenerator,
|
||||
EntityManagerInterface $em,
|
||||
LoggerInterface $logger,
|
||||
private readonly DataTransformerInterface $orphanVisitTransformer,
|
||||
private readonly DataTransformerInterface $shortUrlTransformer,
|
||||
private readonly RabbitMqOptions $options,
|
||||
) {
|
||||
}
|
||||
|
||||
public function __invoke(VisitLocated $shortUrlLocated): void
|
||||
{
|
||||
if (! $this->options->isEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$visitId = $shortUrlLocated->visitId;
|
||||
$visit = $this->em->find(Visit::class, $visitId);
|
||||
|
||||
if ($visit === null) {
|
||||
$this->logger->warning('Tried to notify RabbitMQ for visit with id "{visitId}", but it does not exist.', [
|
||||
'visitId' => $visitId,
|
||||
]);
|
||||
return;
|
||||
}
|
||||
|
||||
$queues = $this->determineQueuesToPublishTo($visit);
|
||||
$payload = $this->visitToPayload($visit);
|
||||
|
||||
try {
|
||||
foreach ($queues as $queue) {
|
||||
$this->rabbitMqHelper->publishPayloadInQueue($payload, $queue);
|
||||
}
|
||||
} catch (Throwable $e) {
|
||||
$this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]);
|
||||
}
|
||||
parent::__construct($rabbitMqHelper, $updatesGenerator, $em, $logger);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string[]
|
||||
* @return Update[]
|
||||
*/
|
||||
private function determineQueuesToPublishTo(Visit $visit): array
|
||||
protected function determineUpdatesForVisit(Visit $visit): array
|
||||
{
|
||||
if ($visit->isOrphan()) {
|
||||
return [Topic::NEW_ORPHAN_VISIT->value];
|
||||
// Once the two deprecated cases below have been removed, make parent method private
|
||||
if (! $this->options->legacyVisitsPublishing()) {
|
||||
return parent::determineUpdatesForVisit($visit);
|
||||
}
|
||||
|
||||
return [
|
||||
Topic::NEW_VISIT->value,
|
||||
Topic::newShortUrlVisit($visit->getShortUrl()?->getShortCode()),
|
||||
];
|
||||
}
|
||||
|
||||
private function visitToPayload(Visit $visit): array
|
||||
{
|
||||
// This was defined incorrectly.
|
||||
// According to the spec, both the visit and the short URL it belongs to, should be published.
|
||||
// The shape should be ['visit' => [...], 'shortUrl' => ?[...]]
|
||||
// However, this would be a breaking change, so we need a flag that determines the shape of the payload.
|
||||
if ($this->options->legacyVisitsPublishing()) {
|
||||
return ! $visit->isOrphan() ? $visit->jsonSerialize() : $this->orphanVisitTransformer->transform($visit);
|
||||
}
|
||||
return $visit->isOrphan()
|
||||
? [
|
||||
Update::forTopicAndPayload(
|
||||
Topic::NEW_ORPHAN_VISIT->value,
|
||||
$this->orphanVisitTransformer->transform($visit),
|
||||
),
|
||||
]
|
||||
: [
|
||||
Update::forTopicAndPayload(Topic::NEW_VISIT->value, $visit->jsonSerialize()),
|
||||
Update::forTopicAndPayload(
|
||||
Topic::newShortUrlVisit($visit->getShortUrl()?->getShortCode()),
|
||||
$visit->jsonSerialize(),
|
||||
),
|
||||
];
|
||||
}
|
||||
|
||||
if ($visit->isOrphan()) {
|
||||
return ['visit' => $this->orphanVisitTransformer->transform($visit)];
|
||||
}
|
||||
protected function isEnabled(): bool
|
||||
{
|
||||
return $this->options->isEnabled();
|
||||
}
|
||||
|
||||
return [
|
||||
'visit' => $visit->jsonSerialize(),
|
||||
'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()),
|
||||
];
|
||||
protected function getRemoteSystem(): RemoteSystem
|
||||
{
|
||||
return RemoteSystem::RABBIT_MQ;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub;
|
||||
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyNewShortUrlListener;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Async\RemoteSystem;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
|
||||
|
||||
class NotifyNewShortUrlToRedis extends AbstractNotifyNewShortUrlListener
|
||||
{
|
||||
public function __construct(
|
||||
PublishingHelperInterface $redisHelper,
|
||||
PublishingUpdatesGeneratorInterface $updatesGenerator,
|
||||
EntityManagerInterface $em,
|
||||
LoggerInterface $logger,
|
||||
private readonly bool $enabled,
|
||||
) {
|
||||
parent::__construct($redisHelper, $updatesGenerator, $em, $logger);
|
||||
}
|
||||
|
||||
protected function isEnabled(): bool
|
||||
{
|
||||
return $this->enabled;
|
||||
}
|
||||
|
||||
protected function getRemoteSystem(): RemoteSystem
|
||||
{
|
||||
return RemoteSystem::REDIS_PUB_SUB;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub;
|
||||
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Async\AbstractNotifyVisitListener;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Async\RemoteSystem;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
|
||||
|
||||
class NotifyVisitToRedis extends AbstractNotifyVisitListener
|
||||
{
|
||||
public function __construct(
|
||||
PublishingHelperInterface $redisHelper,
|
||||
PublishingUpdatesGeneratorInterface $updatesGenerator,
|
||||
EntityManagerInterface $em,
|
||||
LoggerInterface $logger,
|
||||
private readonly bool $enabled,
|
||||
) {
|
||||
parent::__construct($redisHelper, $updatesGenerator, $em, $logger);
|
||||
}
|
||||
|
||||
protected function isEnabled(): bool
|
||||
{
|
||||
return $this->enabled;
|
||||
}
|
||||
|
||||
protected function getRemoteSystem(): RemoteSystem
|
||||
{
|
||||
return RemoteSystem::REDIS_PUB_SUB;
|
||||
}
|
||||
}
|
||||
@@ -11,7 +11,6 @@ class OrphanVisitDataTransformer implements DataTransformerInterface
|
||||
{
|
||||
/**
|
||||
* @param Visit $visit
|
||||
* @return array
|
||||
*/
|
||||
public function transform($visit): array // phpcs:ignore
|
||||
{
|
||||
|
||||
@@ -11,32 +11,32 @@ use Prophecy\Argument;
|
||||
use Prophecy\PhpUnit\ProphecyTrait;
|
||||
use Prophecy\Prophecy\ObjectProphecy;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
|
||||
use Shlinkio\Shlink\Core\Entity\ShortUrl;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Mercure\NotifyNewShortUrlToMercure;
|
||||
use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGeneratorInterface;
|
||||
use Symfony\Component\Mercure\HubInterface;
|
||||
use Symfony\Component\Mercure\Update;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
|
||||
|
||||
class NotifyNewShortUrlToMercureTest extends TestCase
|
||||
{
|
||||
use ProphecyTrait;
|
||||
|
||||
private NotifyNewShortUrlToMercure $listener;
|
||||
private ObjectProphecy $hub;
|
||||
private ObjectProphecy $helper;
|
||||
private ObjectProphecy $updatesGenerator;
|
||||
private ObjectProphecy $em;
|
||||
private ObjectProphecy $logger;
|
||||
|
||||
protected function setUp(): void
|
||||
{
|
||||
$this->hub = $this->prophesize(HubInterface::class);
|
||||
$this->updatesGenerator = $this->prophesize(MercureUpdatesGeneratorInterface::class);
|
||||
$this->helper = $this->prophesize(PublishingHelperInterface::class);
|
||||
$this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class);
|
||||
$this->em = $this->prophesize(EntityManagerInterface::class);
|
||||
$this->logger = $this->prophesize(LoggerInterface::class);
|
||||
|
||||
$this->listener = new NotifyNewShortUrlToMercure(
|
||||
$this->hub->reveal(),
|
||||
$this->helper->reveal(),
|
||||
$this->updatesGenerator->reveal(),
|
||||
$this->em->reveal(),
|
||||
$this->logger->reveal(),
|
||||
@@ -52,10 +52,10 @@ class NotifyNewShortUrlToMercureTest extends TestCase
|
||||
|
||||
$find->shouldHaveBeenCalledOnce();
|
||||
$this->logger->warning(
|
||||
'Tried to notify Mercure for new short URL with id "{shortUrlId}", but it does not exist.',
|
||||
['shortUrlId' => '123'],
|
||||
'Tried to notify {name} for new short URL with id "{shortUrlId}", but it does not exist.',
|
||||
['shortUrlId' => '123', 'name' => 'Mercure'],
|
||||
)->shouldHaveBeenCalledOnce();
|
||||
$this->hub->publish(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->updatesGenerator->newShortUrlUpdate(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
}
|
||||
@@ -64,17 +64,16 @@ class NotifyNewShortUrlToMercureTest extends TestCase
|
||||
public function expectedNotificationIsPublished(): void
|
||||
{
|
||||
$shortUrl = ShortUrl::withLongUrl('');
|
||||
$update = new Update([]);
|
||||
$update = Update::forTopicAndPayload('', []);
|
||||
|
||||
$find = $this->em->find(ShortUrl::class, '123')->willReturn($shortUrl);
|
||||
$newUpdate = $this->updatesGenerator->newShortUrlUpdate($shortUrl)->willReturn($update);
|
||||
$publish = $this->hub->publish($update)->willReturn('');
|
||||
|
||||
($this->listener)(new ShortUrlCreated('123'));
|
||||
|
||||
$find->shouldHaveBeenCalledOnce();
|
||||
$newUpdate->shouldHaveBeenCalledOnce();
|
||||
$publish->shouldHaveBeenCalledOnce();
|
||||
$this->helper->publishUpdate($update)->shouldHaveBeenCalledOnce();
|
||||
$this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
}
|
||||
@@ -83,12 +82,12 @@ class NotifyNewShortUrlToMercureTest extends TestCase
|
||||
public function messageIsPrintedIfPublishingFails(): void
|
||||
{
|
||||
$shortUrl = ShortUrl::withLongUrl('');
|
||||
$update = new Update([]);
|
||||
$update = Update::forTopicAndPayload('', []);
|
||||
$e = new Exception('Error');
|
||||
|
||||
$find = $this->em->find(ShortUrl::class, '123')->willReturn($shortUrl);
|
||||
$newUpdate = $this->updatesGenerator->newShortUrlUpdate($shortUrl)->willReturn($update);
|
||||
$publish = $this->hub->publish($update)->willThrow($e);
|
||||
$publish = $this->helper->publishUpdate($update)->willThrow($e);
|
||||
|
||||
($this->listener)(new ShortUrlCreated('123'));
|
||||
|
||||
@@ -97,8 +96,8 @@ class NotifyNewShortUrlToMercureTest extends TestCase
|
||||
$publish->shouldHaveBeenCalledOnce();
|
||||
$this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->logger->debug(
|
||||
'Error while trying to notify mercure hub with new short URL. {e}',
|
||||
['e' => $e],
|
||||
'Error while trying to notify {name} with new short URL. {e}',
|
||||
['e' => $e, 'name' => 'Mercure'],
|
||||
)->shouldHaveBeenCalledOnce();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,35 +11,35 @@ use Prophecy\PhpUnit\ProphecyTrait;
|
||||
use Prophecy\Prophecy\ObjectProphecy;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use RuntimeException;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
|
||||
use Shlinkio\Shlink\Core\Entity\ShortUrl;
|
||||
use Shlinkio\Shlink\Core\Entity\Visit;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Mercure\NotifyVisitToMercure;
|
||||
use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGeneratorInterface;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
|
||||
use Shlinkio\Shlink\Core\Model\Visitor;
|
||||
use Shlinkio\Shlink\Core\Visit\Model\VisitType;
|
||||
use Symfony\Component\Mercure\HubInterface;
|
||||
use Symfony\Component\Mercure\Update;
|
||||
|
||||
class NotifyVisitToMercureTest extends TestCase
|
||||
{
|
||||
use ProphecyTrait;
|
||||
|
||||
private NotifyVisitToMercure $listener;
|
||||
private ObjectProphecy $hub;
|
||||
private ObjectProphecy $helper;
|
||||
private ObjectProphecy $updatesGenerator;
|
||||
private ObjectProphecy $em;
|
||||
private ObjectProphecy $logger;
|
||||
|
||||
public function setUp(): void
|
||||
{
|
||||
$this->hub = $this->prophesize(HubInterface::class);
|
||||
$this->updatesGenerator = $this->prophesize(MercureUpdatesGeneratorInterface::class);
|
||||
$this->helper = $this->prophesize(PublishingHelperInterface::class);
|
||||
$this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class);
|
||||
$this->em = $this->prophesize(EntityManagerInterface::class);
|
||||
$this->logger = $this->prophesize(LoggerInterface::class);
|
||||
|
||||
$this->listener = new NotifyVisitToMercure(
|
||||
$this->hub->reveal(),
|
||||
$this->helper->reveal(),
|
||||
$this->updatesGenerator->reveal(),
|
||||
$this->em->reveal(),
|
||||
$this->logger->reveal(),
|
||||
@@ -52,8 +52,8 @@ class NotifyVisitToMercureTest extends TestCase
|
||||
$visitId = '123';
|
||||
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn(null);
|
||||
$logWarning = $this->logger->warning(
|
||||
'Tried to notify mercure for visit with id "{visitId}", but it does not exist.',
|
||||
['visitId' => $visitId],
|
||||
'Tried to notify {name} for visit with id "{visitId}", but it does not exist.',
|
||||
['visitId' => $visitId, 'name' => 'Mercure'],
|
||||
);
|
||||
$logDebug = $this->logger->debug(Argument::cetera());
|
||||
$buildNewShortUrlVisitUpdate = $this->updatesGenerator->newShortUrlVisitUpdate(
|
||||
@@ -61,7 +61,7 @@ class NotifyVisitToMercureTest extends TestCase
|
||||
);
|
||||
$buildNewOrphanVisitUpdate = $this->updatesGenerator->newOrphanVisitUpdate(Argument::type(Visit::class));
|
||||
$buildNewVisitUpdate = $this->updatesGenerator->newVisitUpdate(Argument::type(Visit::class));
|
||||
$publish = $this->hub->publish(Argument::type(Update::class));
|
||||
$publish = $this->helper->publishUpdate(Argument::type(Update::class));
|
||||
|
||||
($this->listener)(new VisitLocated($visitId));
|
||||
|
||||
@@ -79,7 +79,7 @@ class NotifyVisitToMercureTest extends TestCase
|
||||
{
|
||||
$visitId = '123';
|
||||
$visit = Visit::forValidShortUrl(ShortUrl::createEmpty(), Visitor::emptyInstance());
|
||||
$update = new Update('', '');
|
||||
$update = Update::forTopicAndPayload('', []);
|
||||
|
||||
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit);
|
||||
$logWarning = $this->logger->warning(Argument::cetera());
|
||||
@@ -87,7 +87,7 @@ class NotifyVisitToMercureTest extends TestCase
|
||||
$buildNewShortUrlVisitUpdate = $this->updatesGenerator->newShortUrlVisitUpdate($visit)->willReturn($update);
|
||||
$buildNewOrphanVisitUpdate = $this->updatesGenerator->newOrphanVisitUpdate($visit)->willReturn($update);
|
||||
$buildNewVisitUpdate = $this->updatesGenerator->newVisitUpdate($visit)->willReturn($update);
|
||||
$publish = $this->hub->publish($update);
|
||||
$publish = $this->helper->publishUpdate($update);
|
||||
|
||||
($this->listener)(new VisitLocated($visitId));
|
||||
|
||||
@@ -105,18 +105,19 @@ class NotifyVisitToMercureTest extends TestCase
|
||||
{
|
||||
$visitId = '123';
|
||||
$visit = Visit::forValidShortUrl(ShortUrl::createEmpty(), Visitor::emptyInstance());
|
||||
$update = new Update('', '');
|
||||
$update = Update::forTopicAndPayload('', []);
|
||||
$e = new RuntimeException('Error');
|
||||
|
||||
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit);
|
||||
$logWarning = $this->logger->warning(Argument::cetera());
|
||||
$logDebug = $this->logger->debug('Error while trying to notify mercure hub with new visit. {e}', [
|
||||
$logDebug = $this->logger->debug('Error while trying to notify {name} with new visit. {e}', [
|
||||
'e' => $e,
|
||||
'name' => 'Mercure',
|
||||
]);
|
||||
$buildNewShortUrlVisitUpdate = $this->updatesGenerator->newShortUrlVisitUpdate($visit)->willReturn($update);
|
||||
$buildNewOrphanVisitUpdate = $this->updatesGenerator->newOrphanVisitUpdate($visit)->willReturn($update);
|
||||
$buildNewVisitUpdate = $this->updatesGenerator->newVisitUpdate($visit)->willReturn($update);
|
||||
$publish = $this->hub->publish($update)->willThrow($e);
|
||||
$publish = $this->helper->publishUpdate($update)->willThrow($e);
|
||||
|
||||
($this->listener)(new VisitLocated($visitId));
|
||||
|
||||
@@ -136,7 +137,7 @@ class NotifyVisitToMercureTest extends TestCase
|
||||
public function notificationsAreSentForOrphanVisits(Visit $visit): void
|
||||
{
|
||||
$visitId = '123';
|
||||
$update = new Update('', '');
|
||||
$update = Update::forTopicAndPayload('', []);
|
||||
|
||||
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit);
|
||||
$logWarning = $this->logger->warning(Argument::cetera());
|
||||
@@ -144,7 +145,7 @@ class NotifyVisitToMercureTest extends TestCase
|
||||
$buildNewShortUrlVisitUpdate = $this->updatesGenerator->newShortUrlVisitUpdate($visit)->willReturn($update);
|
||||
$buildNewOrphanVisitUpdate = $this->updatesGenerator->newOrphanVisitUpdate($visit)->willReturn($update);
|
||||
$buildNewVisitUpdate = $this->updatesGenerator->newVisitUpdate($visit)->willReturn($update);
|
||||
$publish = $this->hub->publish($update);
|
||||
$publish = $this->helper->publishUpdate($update);
|
||||
|
||||
($this->listener)(new VisitLocated($visitId));
|
||||
|
||||
|
||||
@@ -2,13 +2,14 @@
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace ShlinkioTest\Shlink\Core\Mercure;
|
||||
namespace ShlinkioTest\Shlink\Core\EventDispatcher;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
|
||||
use Shlinkio\Shlink\Core\Entity\ShortUrl;
|
||||
use Shlinkio\Shlink\Core\Entity\Visit;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGenerator;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
|
||||
use Shlinkio\Shlink\Core\Mercure\MercureUpdatesGenerator;
|
||||
use Shlinkio\Shlink\Core\Model\ShortUrlMeta;
|
||||
use Shlinkio\Shlink\Core\Model\Visitor;
|
||||
use Shlinkio\Shlink\Core\ShortUrl\Helper\ShortUrlStringifier;
|
||||
@@ -16,15 +17,13 @@ use Shlinkio\Shlink\Core\ShortUrl\Transformer\ShortUrlDataTransformer;
|
||||
use Shlinkio\Shlink\Core\Visit\Model\VisitType;
|
||||
use Shlinkio\Shlink\Core\Visit\Transformer\OrphanVisitDataTransformer;
|
||||
|
||||
use function Shlinkio\Shlink\Common\json_decode;
|
||||
|
||||
class MercureUpdatesGeneratorTest extends TestCase
|
||||
class PublishingUpdatesGeneratorTest extends TestCase
|
||||
{
|
||||
private MercureUpdatesGenerator $generator;
|
||||
private PublishingUpdatesGenerator $generator;
|
||||
|
||||
public function setUp(): void
|
||||
{
|
||||
$this->generator = new MercureUpdatesGenerator(
|
||||
$this->generator = new PublishingUpdatesGenerator(
|
||||
new ShortUrlDataTransformer(new ShortUrlStringifier([])),
|
||||
new OrphanVisitDataTransformer(),
|
||||
);
|
||||
@@ -43,9 +42,10 @@ class MercureUpdatesGeneratorTest extends TestCase
|
||||
]));
|
||||
$visit = Visit::forValidShortUrl($shortUrl, Visitor::emptyInstance());
|
||||
|
||||
/** @var Update $update */
|
||||
$update = $this->generator->{$method}($visit);
|
||||
|
||||
self::assertEquals([$expectedTopic], $update->getTopics());
|
||||
self::assertEquals($expectedTopic, $update->topic);
|
||||
self::assertEquals([
|
||||
'shortUrl' => [
|
||||
'shortCode' => $shortUrl->getShortCode(),
|
||||
@@ -71,7 +71,7 @@ class MercureUpdatesGeneratorTest extends TestCase
|
||||
'date' => $visit->getDate()->toAtomString(),
|
||||
'potentialBot' => false,
|
||||
],
|
||||
], json_decode($update->getData()));
|
||||
], $update->payload);
|
||||
}
|
||||
|
||||
public function provideMethod(): iterable
|
||||
@@ -88,7 +88,7 @@ class MercureUpdatesGeneratorTest extends TestCase
|
||||
{
|
||||
$update = $this->generator->newOrphanVisitUpdate($orphanVisit);
|
||||
|
||||
self::assertEquals(['https://shlink.io/new-orphan-visit'], $update->getTopics());
|
||||
self::assertEquals('https://shlink.io/new-orphan-visit', $update->topic);
|
||||
self::assertEquals([
|
||||
'visit' => [
|
||||
'referer' => '',
|
||||
@@ -99,7 +99,7 @@ class MercureUpdatesGeneratorTest extends TestCase
|
||||
'visitedUrl' => $orphanVisit->visitedUrl(),
|
||||
'type' => $orphanVisit->type()->value,
|
||||
],
|
||||
], json_decode($update->getData()));
|
||||
], $update->payload);
|
||||
}
|
||||
|
||||
public function provideOrphanVisits(): iterable
|
||||
@@ -122,7 +122,7 @@ class MercureUpdatesGeneratorTest extends TestCase
|
||||
|
||||
$update = $this->generator->newShortUrlUpdate($shortUrl);
|
||||
|
||||
self::assertEquals([Topic::NEW_SHORT_URL->value], $update->getTopics());
|
||||
self::assertEquals(Topic::NEW_SHORT_URL->value, $update->topic);
|
||||
self::assertEquals(['shortUrl' => [
|
||||
'shortCode' => $shortUrl->getShortCode(),
|
||||
'shortUrl' => 'http:/' . $shortUrl->getShortCode(),
|
||||
@@ -139,6 +139,6 @@ class MercureUpdatesGeneratorTest extends TestCase
|
||||
'title' => $shortUrl->title(),
|
||||
'crawlable' => false,
|
||||
'forwardQuery' => true,
|
||||
],], json_decode($update->getData()));
|
||||
]], $update->payload);
|
||||
}
|
||||
}
|
||||
@@ -13,14 +13,14 @@ use Prophecy\PhpUnit\ProphecyTrait;
|
||||
use Prophecy\Prophecy\ObjectProphecy;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use RuntimeException;
|
||||
use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
|
||||
use Shlinkio\Shlink\Core\Entity\ShortUrl;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
|
||||
use Shlinkio\Shlink\Core\Options\RabbitMqOptions;
|
||||
use Shlinkio\Shlink\Core\ShortUrl\Helper\ShortUrlStringifier;
|
||||
use Shlinkio\Shlink\Core\ShortUrl\Transformer\ShortUrlDataTransformer;
|
||||
use Throwable;
|
||||
|
||||
class NotifyNewShortUrlToRabbitMqTest extends TestCase
|
||||
@@ -29,22 +29,24 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase
|
||||
|
||||
private NotifyNewShortUrlToRabbitMq $listener;
|
||||
private ObjectProphecy $helper;
|
||||
private ObjectProphecy $updatesGenerator;
|
||||
private ObjectProphecy $em;
|
||||
private ObjectProphecy $logger;
|
||||
private RabbitMqOptions $options;
|
||||
|
||||
protected function setUp(): void
|
||||
{
|
||||
$this->helper = $this->prophesize(RabbitMqPublishingHelperInterface::class);
|
||||
$this->helper = $this->prophesize(PublishingHelperInterface::class);
|
||||
$this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class);
|
||||
$this->em = $this->prophesize(EntityManagerInterface::class);
|
||||
$this->logger = $this->prophesize(LoggerInterface::class);
|
||||
$this->options = new RabbitMqOptions(['enabled' => true]);
|
||||
|
||||
$this->listener = new NotifyNewShortUrlToRabbitMq(
|
||||
$this->helper->reveal(),
|
||||
$this->updatesGenerator->reveal(),
|
||||
$this->em->reveal(),
|
||||
$this->logger->reveal(),
|
||||
new ShortUrlDataTransformer(new ShortUrlStringifier([])),
|
||||
$this->options,
|
||||
);
|
||||
}
|
||||
@@ -59,7 +61,7 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase
|
||||
$this->em->find(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->helper->publishPayloadInQueue(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
}
|
||||
|
||||
/** @test */
|
||||
@@ -68,8 +70,8 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase
|
||||
$shortUrlId = '123';
|
||||
$find = $this->em->find(ShortUrl::class, $shortUrlId)->willReturn(null);
|
||||
$logWarning = $this->logger->warning(
|
||||
'Tried to notify RabbitMQ for new short URL with id "{shortUrlId}", but it does not exist.',
|
||||
['shortUrlId' => $shortUrlId],
|
||||
'Tried to notify {name} for new short URL with id "{shortUrlId}", but it does not exist.',
|
||||
['shortUrlId' => $shortUrlId, 'name' => 'RabbitMQ'],
|
||||
);
|
||||
|
||||
($this->listener)(new ShortUrlCreated($shortUrlId));
|
||||
@@ -77,22 +79,24 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase
|
||||
$find->shouldHaveBeenCalledOnce();
|
||||
$logWarning->shouldHaveBeenCalledOnce();
|
||||
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->helper->publishPayloadInQueue(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
}
|
||||
|
||||
/** @test */
|
||||
public function expectedChannelIsNotified(): void
|
||||
{
|
||||
$shortUrlId = '123';
|
||||
$update = Update::forTopicAndPayload(Topic::NEW_SHORT_URL->value, []);
|
||||
$find = $this->em->find(ShortUrl::class, $shortUrlId)->willReturn(ShortUrl::withLongUrl(''));
|
||||
$generateUpdate = $this->updatesGenerator->newShortUrlUpdate(Argument::type(ShortUrl::class))->willReturn(
|
||||
$update,
|
||||
);
|
||||
|
||||
($this->listener)(new ShortUrlCreated($shortUrlId));
|
||||
|
||||
$find->shouldHaveBeenCalledOnce();
|
||||
$this->helper->publishPayloadInQueue(
|
||||
Argument::type('array'),
|
||||
Topic::NEW_SHORT_URL->value,
|
||||
)->shouldHaveBeenCalledOnce();
|
||||
$generateUpdate->shouldHaveBeenCalledOnce();
|
||||
$this->helper->publishUpdate($update)->shouldHaveBeenCalledOnce();
|
||||
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
}
|
||||
|
||||
@@ -103,16 +107,21 @@ class NotifyNewShortUrlToRabbitMqTest extends TestCase
|
||||
public function printsDebugMessageInCaseOfError(Throwable $e): void
|
||||
{
|
||||
$shortUrlId = '123';
|
||||
$update = Update::forTopicAndPayload(Topic::NEW_SHORT_URL->value, []);
|
||||
$find = $this->em->find(ShortUrl::class, $shortUrlId)->willReturn(ShortUrl::withLongUrl(''));
|
||||
$publish = $this->helper->publishPayloadInQueue(Argument::cetera())->willThrow($e);
|
||||
$generateUpdate = $this->updatesGenerator->newShortUrlUpdate(Argument::type(ShortUrl::class))->willReturn(
|
||||
$update,
|
||||
);
|
||||
$publish = $this->helper->publishUpdate($update)->willThrow($e);
|
||||
|
||||
($this->listener)(new ShortUrlCreated($shortUrlId));
|
||||
|
||||
$this->logger->debug(
|
||||
'Error while trying to notify RabbitMQ with new short URL. {e}',
|
||||
['e' => $e],
|
||||
'Error while trying to notify {name} with new short URL. {e}',
|
||||
['e' => $e, 'name' => 'RabbitMQ'],
|
||||
)->shouldHaveBeenCalledOnce();
|
||||
$find->shouldHaveBeenCalledOnce();
|
||||
$generateUpdate->shouldHaveBeenCalledOnce();
|
||||
$publish->shouldHaveBeenCalledOnce();
|
||||
}
|
||||
|
||||
|
||||
@@ -14,21 +14,22 @@ use Prophecy\PhpUnit\ProphecyTrait;
|
||||
use Prophecy\Prophecy\ObjectProphecy;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use RuntimeException;
|
||||
use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
|
||||
use Shlinkio\Shlink\Core\Entity\ShortUrl;
|
||||
use Shlinkio\Shlink\Core\Entity\Visit;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\RabbitMq\NotifyVisitToRabbitMq;
|
||||
use Shlinkio\Shlink\Core\Model\ShortUrlMeta;
|
||||
use Shlinkio\Shlink\Core\Model\Visitor;
|
||||
use Shlinkio\Shlink\Core\Options\RabbitMqOptions;
|
||||
use Shlinkio\Shlink\Core\ShortUrl\Helper\ShortUrlStringifier;
|
||||
use Shlinkio\Shlink\Core\ShortUrl\Transformer\ShortUrlDataTransformer;
|
||||
use Shlinkio\Shlink\Core\Visit\Transformer\OrphanVisitDataTransformer;
|
||||
use Throwable;
|
||||
|
||||
use function count;
|
||||
use function Functional\contains;
|
||||
use function Functional\each;
|
||||
use function Functional\noop;
|
||||
|
||||
class NotifyVisitToRabbitMqTest extends TestCase
|
||||
{
|
||||
@@ -36,23 +37,25 @@ class NotifyVisitToRabbitMqTest extends TestCase
|
||||
|
||||
private NotifyVisitToRabbitMq $listener;
|
||||
private ObjectProphecy $helper;
|
||||
private ObjectProphecy $updatesGenerator;
|
||||
private ObjectProphecy $em;
|
||||
private ObjectProphecy $logger;
|
||||
private RabbitMqOptions $options;
|
||||
|
||||
protected function setUp(): void
|
||||
{
|
||||
$this->helper = $this->prophesize(RabbitMqPublishingHelperInterface::class);
|
||||
$this->helper = $this->prophesize(PublishingHelperInterface::class);
|
||||
$this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class);
|
||||
$this->em = $this->prophesize(EntityManagerInterface::class);
|
||||
$this->logger = $this->prophesize(LoggerInterface::class);
|
||||
$this->options = new RabbitMqOptions(['enabled' => true, 'legacy_visits_publishing' => true]);
|
||||
$this->options = new RabbitMqOptions(['enabled' => true, 'legacy_visits_publishing' => false]);
|
||||
|
||||
$this->listener = new NotifyVisitToRabbitMq(
|
||||
$this->helper->reveal(),
|
||||
$this->updatesGenerator->reveal(),
|
||||
$this->em->reveal(),
|
||||
$this->logger->reveal(),
|
||||
new OrphanVisitDataTransformer(),
|
||||
new ShortUrlDataTransformer(new ShortUrlStringifier([])),
|
||||
$this->options,
|
||||
);
|
||||
}
|
||||
@@ -67,7 +70,7 @@ class NotifyVisitToRabbitMqTest extends TestCase
|
||||
$this->em->find(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->helper->publishPayloadInQueue(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
}
|
||||
|
||||
/** @test */
|
||||
@@ -76,8 +79,8 @@ class NotifyVisitToRabbitMqTest extends TestCase
|
||||
$visitId = '123';
|
||||
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn(null);
|
||||
$logWarning = $this->logger->warning(
|
||||
'Tried to notify RabbitMQ for visit with id "{visitId}", but it does not exist.',
|
||||
['visitId' => $visitId],
|
||||
'Tried to notify {name} for visit with id "{visitId}", but it does not exist.',
|
||||
['visitId' => $visitId, 'name' => 'RabbitMQ'],
|
||||
);
|
||||
|
||||
($this->listener)(new VisitLocated($visitId));
|
||||
@@ -85,7 +88,7 @@ class NotifyVisitToRabbitMqTest extends TestCase
|
||||
$findVisit->shouldHaveBeenCalledOnce();
|
||||
$logWarning->shouldHaveBeenCalledOnce();
|
||||
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->helper->publishPayloadInQueue(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -96,17 +99,18 @@ class NotifyVisitToRabbitMqTest extends TestCase
|
||||
{
|
||||
$visitId = '123';
|
||||
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit);
|
||||
$argumentWithExpectedChannels = Argument::that(
|
||||
static fn (string $channel) => contains($expectedChannels, $channel),
|
||||
);
|
||||
each($expectedChannels, function (string $method): void {
|
||||
$this->updatesGenerator->{$method}(Argument::type(Visit::class))->willReturn(
|
||||
Update::forTopicAndPayload('', []),
|
||||
)->shouldBeCalledOnce();
|
||||
});
|
||||
|
||||
($this->listener)(new VisitLocated($visitId));
|
||||
|
||||
$findVisit->shouldHaveBeenCalledOnce();
|
||||
$this->helper->publishPayloadInQueue(
|
||||
Argument::type('array'),
|
||||
$argumentWithExpectedChannels,
|
||||
)->shouldHaveBeenCalledTimes(count($expectedChannels));
|
||||
$this->helper->publishUpdate(Argument::type(Update::class))->shouldHaveBeenCalledTimes(
|
||||
count($expectedChannels),
|
||||
);
|
||||
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
}
|
||||
|
||||
@@ -114,7 +118,7 @@ class NotifyVisitToRabbitMqTest extends TestCase
|
||||
{
|
||||
$visitor = Visitor::emptyInstance();
|
||||
|
||||
yield 'orphan visit' => [Visit::forBasePath($visitor), ['https://shlink.io/new-orphan-visit']];
|
||||
yield 'orphan visit' => [Visit::forBasePath($visitor), ['newOrphanVisitUpdate']];
|
||||
yield 'non-orphan visit' => [
|
||||
Visit::forValidShortUrl(
|
||||
ShortUrl::fromMeta(ShortUrlMeta::fromRawData([
|
||||
@@ -123,7 +127,7 @@ class NotifyVisitToRabbitMqTest extends TestCase
|
||||
])),
|
||||
$visitor,
|
||||
),
|
||||
['https://shlink.io/new-visit', 'https://shlink.io/new-visit/bar'],
|
||||
['newShortUrlVisitUpdate', 'newVisitUpdate'],
|
||||
];
|
||||
}
|
||||
|
||||
@@ -135,15 +139,19 @@ class NotifyVisitToRabbitMqTest extends TestCase
|
||||
{
|
||||
$visitId = '123';
|
||||
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn(Visit::forBasePath(Visitor::emptyInstance()));
|
||||
$publish = $this->helper->publishPayloadInQueue(Argument::cetera())->willThrow($e);
|
||||
$generateUpdate = $this->updatesGenerator->newOrphanVisitUpdate(Argument::type(Visit::class))->willReturn(
|
||||
Update::forTopicAndPayload('', []),
|
||||
);
|
||||
$publish = $this->helper->publishUpdate(Argument::cetera())->willThrow($e);
|
||||
|
||||
($this->listener)(new VisitLocated($visitId));
|
||||
|
||||
$this->logger->debug(
|
||||
'Error while trying to notify RabbitMQ with new visit. {e}',
|
||||
['e' => $e],
|
||||
'Error while trying to notify {name} with new visit. {e}',
|
||||
['e' => $e, 'name' => 'RabbitMQ'],
|
||||
)->shouldHaveBeenCalledOnce();
|
||||
$findVisit->shouldHaveBeenCalledOnce();
|
||||
$generateUpdate->shouldHaveBeenCalledOnce();
|
||||
$publish->shouldHaveBeenCalledOnce();
|
||||
}
|
||||
|
||||
@@ -161,70 +169,81 @@ class NotifyVisitToRabbitMqTest extends TestCase
|
||||
public function expectedPayloadIsPublishedDependingOnConfig(
|
||||
bool $legacy,
|
||||
Visit $visit,
|
||||
callable $assertPayload,
|
||||
callable $assert,
|
||||
callable $setup,
|
||||
): void {
|
||||
$this->options->legacyVisitsPublishing = $legacy;
|
||||
|
||||
$visitId = '123';
|
||||
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit);
|
||||
$setup($this->updatesGenerator);
|
||||
|
||||
($this->listener)(new VisitLocated($visitId));
|
||||
|
||||
$findVisit->shouldHaveBeenCalledOnce();
|
||||
$this->helper->publishPayloadInQueue(Argument::that($assertPayload), Argument::type('string'))
|
||||
->shouldHaveBeenCalled();
|
||||
$assert($this->helper, $this->updatesGenerator);
|
||||
}
|
||||
|
||||
public function provideLegacyPayloads(): iterable
|
||||
{
|
||||
yield 'non-legacy non-orphan visit' => [
|
||||
yield 'legacy non-orphan visit' => [
|
||||
true,
|
||||
$visit = Visit::forValidShortUrl(ShortUrl::withLongUrl(''), Visitor::emptyInstance()),
|
||||
function (array $payload) use ($visit): bool {
|
||||
Assert::assertEquals($payload, $visit->jsonSerialize());
|
||||
Assert::assertArrayNotHasKey('visitedUrl', $payload);
|
||||
Assert::assertArrayNotHasKey('type', $payload);
|
||||
Assert::assertArrayNotHasKey('visit', $payload);
|
||||
Assert::assertArrayNotHasKey('shortUrl', $payload);
|
||||
function (ObjectProphecy|PublishingHelperInterface $helper) use ($visit): void {
|
||||
$helper->publishUpdate(Argument::that(function (Update $update) use ($visit): bool {
|
||||
$payload = $update->payload;
|
||||
Assert::assertEquals($payload, $visit->jsonSerialize());
|
||||
Assert::assertArrayNotHasKey('visitedUrl', $payload);
|
||||
Assert::assertArrayNotHasKey('type', $payload);
|
||||
Assert::assertArrayNotHasKey('visit', $payload);
|
||||
Assert::assertArrayNotHasKey('shortUrl', $payload);
|
||||
|
||||
return true;
|
||||
return true;
|
||||
}));
|
||||
},
|
||||
noop(...),
|
||||
];
|
||||
yield 'legacy orphan visit' => [
|
||||
true,
|
||||
Visit::forBasePath(Visitor::emptyInstance()),
|
||||
function (ObjectProphecy|PublishingHelperInterface $helper): void {
|
||||
$helper->publishUpdate(Argument::that(function (Update $update): bool {
|
||||
$payload = $update->payload;
|
||||
Assert::assertArrayHasKey('visitedUrl', $payload);
|
||||
Assert::assertArrayHasKey('type', $payload);
|
||||
|
||||
return true;
|
||||
}));
|
||||
},
|
||||
noop(...),
|
||||
];
|
||||
yield 'non-legacy non-orphan visit' => [
|
||||
false,
|
||||
Visit::forValidShortUrl(ShortUrl::withLongUrl(''), Visitor::emptyInstance()),
|
||||
function (ObjectProphecy|PublishingHelperInterface $helper): void {
|
||||
$helper->publishUpdate(Argument::type(Update::class))->shouldHaveBeenCalledTimes(2);
|
||||
},
|
||||
function (ObjectProphecy|PublishingUpdatesGeneratorInterface $updatesGenerator): void {
|
||||
$update = Update::forTopicAndPayload('', []);
|
||||
$updatesGenerator->newOrphanVisitUpdate(Argument::cetera())->shouldNotBeCalled();
|
||||
$updatesGenerator->newVisitUpdate(Argument::cetera())->willReturn($update)
|
||||
->shouldBeCalledOnce();
|
||||
$updatesGenerator->newShortUrlVisitUpdate(Argument::cetera())->willReturn($update)
|
||||
->shouldBeCalledOnce();
|
||||
},
|
||||
];
|
||||
yield 'non-legacy orphan visit' => [
|
||||
true,
|
||||
Visit::forBasePath(Visitor::emptyInstance()),
|
||||
function (array $payload): bool {
|
||||
Assert::assertArrayHasKey('visitedUrl', $payload);
|
||||
Assert::assertArrayHasKey('type', $payload);
|
||||
|
||||
return true;
|
||||
},
|
||||
];
|
||||
yield 'legacy non-orphan visit' => [
|
||||
false,
|
||||
$visit = Visit::forValidShortUrl(ShortUrl::withLongUrl(''), Visitor::emptyInstance()),
|
||||
function (array $payload) use ($visit): bool {
|
||||
Assert::assertArrayHasKey('visit', $payload);
|
||||
Assert::assertArrayHasKey('shortUrl', $payload);
|
||||
Assert::assertIsArray($payload['visit']);
|
||||
Assert::assertEquals($payload['visit'], $visit->jsonSerialize());
|
||||
Assert::assertArrayNotHasKey('visitedUrl', ['visit']);
|
||||
Assert::assertArrayNotHasKey('type', ['visit']);
|
||||
|
||||
return true;
|
||||
},
|
||||
];
|
||||
yield 'legacy orphan visit' => [
|
||||
false,
|
||||
Visit::forBasePath(Visitor::emptyInstance()),
|
||||
function (array $payload): bool {
|
||||
Assert::assertArrayHasKey('visit', $payload);
|
||||
Assert::assertArrayNotHasKey('shortUrl', $payload);
|
||||
Assert::assertIsArray($payload['visit']);
|
||||
Assert::assertArrayHasKey('visitedUrl', $payload['visit']);
|
||||
Assert::assertArrayHasKey('type', $payload['visit']);
|
||||
|
||||
return true;
|
||||
function (ObjectProphecy|PublishingHelperInterface $helper): void {
|
||||
$helper->publishUpdate(Argument::type(Update::class))->shouldHaveBeenCalledOnce();
|
||||
},
|
||||
function (ObjectProphecy|PublishingUpdatesGeneratorInterface $updatesGenerator): void {
|
||||
$update = Update::forTopicAndPayload('', []);
|
||||
$updatesGenerator->newOrphanVisitUpdate(Argument::cetera())->willReturn($update)
|
||||
->shouldBeCalledOnce();
|
||||
$updatesGenerator->newVisitUpdate(Argument::cetera())->shouldNotBeCalled();
|
||||
$updatesGenerator->newShortUrlVisitUpdate(Argument::cetera())->shouldNotBeCalled();
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
@@ -0,0 +1,95 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace ShlinkioTest\Shlink\Core\EventDispatcher\RedisPubSub;
|
||||
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use DomainException;
|
||||
use Exception;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Prophecy\Argument;
|
||||
use Prophecy\PhpUnit\ProphecyTrait;
|
||||
use Prophecy\Prophecy\ObjectProphecy;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use RuntimeException;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
|
||||
use Shlinkio\Shlink\Core\Entity\ShortUrl;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
|
||||
use Throwable;
|
||||
|
||||
class NotifyNewShortUrlToRedisTest extends TestCase
|
||||
{
|
||||
use ProphecyTrait;
|
||||
|
||||
private ObjectProphecy $helper;
|
||||
private ObjectProphecy $updatesGenerator;
|
||||
private ObjectProphecy $em;
|
||||
private ObjectProphecy $logger;
|
||||
|
||||
protected function setUp(): void
|
||||
{
|
||||
$this->helper = $this->prophesize(PublishingHelperInterface::class);
|
||||
$this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class);
|
||||
$this->em = $this->prophesize(EntityManagerInterface::class);
|
||||
$this->logger = $this->prophesize(LoggerInterface::class);
|
||||
}
|
||||
|
||||
/** @test */
|
||||
public function doesNothingWhenTheFeatureIsNotEnabled(): void
|
||||
{
|
||||
$this->createListener(false)(new ShortUrlCreated('123'));
|
||||
|
||||
$this->em->find(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
}
|
||||
|
||||
/**
|
||||
* @test
|
||||
* @dataProvider provideExceptions
|
||||
*/
|
||||
public function printsDebugMessageInCaseOfError(Throwable $e): void
|
||||
{
|
||||
$shortUrlId = '123';
|
||||
$update = Update::forTopicAndPayload(Topic::NEW_SHORT_URL->value, []);
|
||||
$find = $this->em->find(ShortUrl::class, $shortUrlId)->willReturn(ShortUrl::withLongUrl(''));
|
||||
$generateUpdate = $this->updatesGenerator->newShortUrlUpdate(Argument::type(ShortUrl::class))->willReturn(
|
||||
$update,
|
||||
);
|
||||
$publish = $this->helper->publishUpdate($update)->willThrow($e);
|
||||
|
||||
$this->createListener()(new ShortUrlCreated($shortUrlId));
|
||||
|
||||
$this->logger->debug(
|
||||
'Error while trying to notify {name} with new short URL. {e}',
|
||||
['e' => $e, 'name' => 'Redis pub/sub'],
|
||||
)->shouldHaveBeenCalledOnce();
|
||||
$find->shouldHaveBeenCalledOnce();
|
||||
$generateUpdate->shouldHaveBeenCalledOnce();
|
||||
$publish->shouldHaveBeenCalledOnce();
|
||||
}
|
||||
|
||||
public function provideExceptions(): iterable
|
||||
{
|
||||
yield [new RuntimeException('RuntimeException Error')];
|
||||
yield [new Exception('Exception Error')];
|
||||
yield [new DomainException('DomainException Error')];
|
||||
}
|
||||
|
||||
private function createListener(bool $enabled = true): NotifyNewShortUrlToRedis
|
||||
{
|
||||
return new NotifyNewShortUrlToRedis(
|
||||
$this->helper->reveal(),
|
||||
$this->updatesGenerator->reveal(),
|
||||
$this->em->reveal(),
|
||||
$this->logger->reveal(),
|
||||
$enabled,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace ShlinkioTest\Shlink\Core\EventDispatcher\RedisPubSub;
|
||||
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use DomainException;
|
||||
use Exception;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Prophecy\Argument;
|
||||
use Prophecy\PhpUnit\ProphecyTrait;
|
||||
use Prophecy\Prophecy\ObjectProphecy;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use RuntimeException;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
|
||||
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
|
||||
use Shlinkio\Shlink\Core\Entity\Visit;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
|
||||
use Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub\NotifyVisitToRedis;
|
||||
use Shlinkio\Shlink\Core\Model\Visitor;
|
||||
use Throwable;
|
||||
|
||||
class NotifyVisitToRedisTest extends TestCase
|
||||
{
|
||||
use ProphecyTrait;
|
||||
|
||||
private ObjectProphecy $helper;
|
||||
private ObjectProphecy $updatesGenerator;
|
||||
private ObjectProphecy $em;
|
||||
private ObjectProphecy $logger;
|
||||
|
||||
protected function setUp(): void
|
||||
{
|
||||
$this->helper = $this->prophesize(PublishingHelperInterface::class);
|
||||
$this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class);
|
||||
$this->em = $this->prophesize(EntityManagerInterface::class);
|
||||
$this->logger = $this->prophesize(LoggerInterface::class);
|
||||
}
|
||||
|
||||
/** @test */
|
||||
public function doesNothingWhenTheFeatureIsNotEnabled(): void
|
||||
{
|
||||
$this->createListener(false)(new VisitLocated('123'));
|
||||
|
||||
$this->em->find(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
$this->helper->publishUpdate(Argument::cetera())->shouldNotHaveBeenCalled();
|
||||
}
|
||||
|
||||
/**
|
||||
* @test
|
||||
* @dataProvider provideExceptions
|
||||
*/
|
||||
public function printsDebugMessageInCaseOfError(Throwable $e): void
|
||||
{
|
||||
$visitId = '123';
|
||||
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn(Visit::forBasePath(Visitor::emptyInstance()));
|
||||
$generateUpdate = $this->updatesGenerator->newOrphanVisitUpdate(Argument::type(Visit::class))->willReturn(
|
||||
Update::forTopicAndPayload('', []),
|
||||
);
|
||||
$publish = $this->helper->publishUpdate(Argument::cetera())->willThrow($e);
|
||||
|
||||
$this->createListener()(new VisitLocated($visitId));
|
||||
|
||||
$this->logger->debug(
|
||||
'Error while trying to notify {name} with new visit. {e}',
|
||||
['e' => $e, 'name' => 'Redis pub/sub'],
|
||||
)->shouldHaveBeenCalledOnce();
|
||||
$findVisit->shouldHaveBeenCalledOnce();
|
||||
$generateUpdate->shouldHaveBeenCalledOnce();
|
||||
$publish->shouldHaveBeenCalledOnce();
|
||||
}
|
||||
|
||||
public function provideExceptions(): iterable
|
||||
{
|
||||
yield [new RuntimeException('RuntimeException Error')];
|
||||
yield [new Exception('Exception Error')];
|
||||
yield [new DomainException('DomainException Error')];
|
||||
}
|
||||
|
||||
private function createListener(bool $enabled = true): NotifyVisitToRedis
|
||||
{
|
||||
return new NotifyVisitToRedis(
|
||||
$this->helper->reveal(),
|
||||
$this->updatesGenerator->reveal(),
|
||||
$this->em->reveal(),
|
||||
$this->logger->reveal(),
|
||||
$enabled,
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user