Updated to latest common, with unified publishing API

This commit is contained in:
Alejandro Celaya
2022-07-26 12:07:27 +02:00
parent 233bb603cf
commit 791d6b7e57
7 changed files with 58 additions and 47 deletions

View File

@@ -6,8 +6,9 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface;
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
use Shlinkio\Shlink\Core\Entity\ShortUrl;
use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated;
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
@@ -17,7 +18,7 @@ use Throwable;
class NotifyNewShortUrlToRabbitMq
{
public function __construct(
private readonly RabbitMqPublishingHelperInterface $rabbitMqHelper,
private readonly PublishingHelperInterface $rabbitMqHelper,
private readonly EntityManagerInterface $em,
private readonly LoggerInterface $logger,
private readonly DataTransformerInterface $shortUrlTransformer,
@@ -43,10 +44,10 @@ class NotifyNewShortUrlToRabbitMq
}
try {
$this->rabbitMqHelper->publishPayloadInQueue(
['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)],
$this->rabbitMqHelper->publishUpdate(Update::forTopicAndPayload(
Topic::NEW_SHORT_URL->value,
);
['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)],
));
} catch (Throwable $e) {
$this->logger->debug('Error while trying to notify RabbitMQ with new short URL. {e}', ['e' => $e]);
}

View File

@@ -6,8 +6,9 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface;
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
use Shlinkio\Shlink\Core\Entity\Visit;
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
@@ -19,7 +20,7 @@ use function Functional\each;
class NotifyVisitToRabbitMq
{
public function __construct(
private readonly RabbitMqPublishingHelperInterface $rabbitMqHelper,
private readonly PublishingHelperInterface $rabbitMqHelper,
private readonly EntityManagerInterface $em,
private readonly LoggerInterface $logger,
private readonly DataTransformerInterface $orphanVisitTransformer,
@@ -48,7 +49,9 @@ class NotifyVisitToRabbitMq
$payload = $this->visitToPayload($visit);
try {
each($queues, fn (string $queue) => $this->rabbitMqHelper->publishPayloadInQueue($payload, $queue));
each($queues, fn (string $queue) => $this->rabbitMqHelper->publishUpdate(
Update::forTopicAndPayload($queue, $payload),
));
} catch (Throwable $e) {
$this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]);
}

View File

@@ -6,8 +6,9 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Shlinkio\Shlink\Common\Cache\RedisPublishingHelperInterface;
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
use Shlinkio\Shlink\Core\Entity\ShortUrl;
use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated;
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
@@ -16,7 +17,7 @@ use Throwable;
class NotifyNewShortUrlToRedis
{
public function __construct(
private readonly RedisPublishingHelperInterface $redisHelper,
private readonly PublishingHelperInterface $redisHelper,
private readonly EntityManagerInterface $em,
private readonly LoggerInterface $logger,
private readonly DataTransformerInterface $shortUrlTransformer,
@@ -42,10 +43,10 @@ class NotifyNewShortUrlToRedis
}
try {
$this->redisHelper->publishPayloadInQueue(
['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)],
$this->redisHelper->publishUpdate(Update::forTopicAndPayload(
Topic::NEW_SHORT_URL->value,
);
['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)],
));
} catch (Throwable $e) {
$this->logger->debug('Error while trying to notify Redis pub/sub with new short URL. {e}', ['e' => $e]);
}

View File

@@ -6,8 +6,9 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Shlinkio\Shlink\Common\Cache\RedisPublishingHelperInterface;
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
use Shlinkio\Shlink\Core\Entity\Visit;
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
@@ -18,7 +19,7 @@ use function Functional\each;
class NotifyVisitToRedis
{
public function __construct(
private readonly RedisPublishingHelperInterface $redisHelper,
private readonly PublishingHelperInterface $redisHelper,
private readonly EntityManagerInterface $em,
private readonly LoggerInterface $logger,
private readonly DataTransformerInterface $orphanVisitTransformer,
@@ -48,7 +49,9 @@ class NotifyVisitToRedis
$payload = $this->visitToPayload($visit);
try {
each($queues, fn (string $queue) => $this->redisHelper->publishPayloadInQueue($payload, $queue));
each($queues, fn (string $queue) => $this->redisHelper->publishUpdate(
Update::forTopicAndPayload($queue, $payload),
));
} catch (Throwable $e) {
$this->logger->debug('Error while trying to notify Redis pub/sub with new visit. {e}', ['e' => $e]);
}