Skip to content

chore(messenger): use Messenger instrumentation #173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
"open-telemetry/sdk": "^1",
"open-telemetry/sem-conv": "^1",
"open-telemetry/symfony-sdk-bundle": "^0",
"symfony/amqp-messenger": "^7.2",
"symfony/config": "^7.2",
"symfony/dependency-injection": "^7.2",
"symfony/event-dispatcher": "^7.2",
"symfony/messenger": "^7.2",
"zenstruck/dsn": "^0.2",
"zenstruck/uri": "^2.3"
},
Expand Down Expand Up @@ -67,7 +69,6 @@
"symfony/http-client": "^7.2",
"symfony/http-kernel": "^7.2",
"symfony/mailer": "^7.2",
"symfony/messenger": "^7.2",
"symfony/monolog-bundle": "^3.10",
"symfony/phpunit-bridge": "^7.2",
"symfony/runtime": "^7.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ public function process(ContainerBuilder $container): void

if ($container->hasParameter('open_telemetry.instrumentation.messenger.type')) {
$messengerInstrumentationType = $container->getParameter('open_telemetry.instrumentation.messenger.type');
if ($container->hasDefinition('open_telemetry.instrumentation.http_kernel.trace.event_subscriber')) {

if ($container->hasDefinition('open_telemetry.instrumentation.messenger.worker')) {
$container->getDefinition('open_telemetry.instrumentation.messenger.worker')
->addMethodCall('setInstrumentationType', [$messengerInstrumentationType]);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/DependencyInjection/OpenTelemetryExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ private function registerMessengerTracingInstrumentationConfiguration(ContainerB
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace.transport');
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace.transport_factory');
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace.middleware');
$container->removeDefinition('open_telemetry.instrumentation.messenger.worker');
}

$this->setTracingInstrumentationParams($container, 'messenger', $config, $isConfigEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ final class TraceableConsoleEventSubscriber implements EventSubscriberInterface,
*/
private array $excludeCommands = [];

/**
* @var string[]
*/
private array $notSupportedCommands = [
'messenger:consume', // designed to run indefinitely
];

public function __construct(
private readonly TracerInterface $tracer,
/** @var ServiceLocator<TracerInterface> */
Expand All @@ -56,6 +63,9 @@ public static function getSubscribedEvents(): array
];
}

/**
* @return class-string[]
*/
public static function getSubscribedServices(): array
{
return [TracerInterface::class];
Expand All @@ -67,6 +77,10 @@ public function startSpan(ConsoleCommandEvent $event): void

assert($command instanceof Command);

if ($this->isNotSupported($command)) {
return;
}

if (false === $this->isAutoTraceable($command) && false === $this->isAttributeTraceable($command)) {
return;
}
Expand Down Expand Up @@ -173,6 +187,11 @@ private function isAutoTraceable(Command $command): bool
return true;
}

private function isNotSupported(Command $command): bool
{
return in_array($command->getName(), $this->notSupportedCommands, true);
}

private function isAttributeTraceable(Command $command): bool
{
$traceable = $this->parseAttribute($command);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger;

use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Context\Propagator\TraceStampPropagator;
use OpenTelemetry\Context\Context;
use OpenTelemetry\Context\Propagation\MultiTextMapPropagator;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;

/**
* Be aware the app consuming the message must be able to denormalize the stamp.
*/
readonly class AddStampForPropagationMiddleware implements MiddlewareInterface
{
public function __construct(
private MultiTextMapPropagator $propagator,
private ?LoggerInterface $logger = null,
) {
}

public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$traceStamp = $envelope->last(TraceStamp::class);

if (null !== $traceStamp) {
return $stack->next()->handle($envelope, $stack);
}

$scope = Context::storage()->scope();

if (null !== $scope) {
$this->propagator->inject($envelope, new TraceStampPropagator($this->logger), Context::getCurrent());
}

return $stack->next()->handle($envelope, $stack);
}
}
23 changes: 23 additions & 0 deletions src/Instrumentation/Symfony/Messenger/TraceStamp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger;

use Symfony\Component\Messenger\Stamp\StampInterface;

/**
* @doc: https://www.w3.org/TR/trace-context/
*
* You can see how the trace parent generated here: https://github.com/open-telemetry/opentelemetry-php/blob/main/src/API/Trace/Propagation/TraceContextPropagator.php
*/
readonly class TraceStamp implements StampInterface
{
public function __construct(
private string $traceParent,
) {
}

public function getTraceParent(): string
{
return $this->traceParent;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
<?php

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger;

use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\InstrumentationTypeEnum;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Context\Propagator\TraceStampPropagator;
use OpenTelemetry\API\Trace\SpanKind;
use OpenTelemetry\API\Trace\StatusCode;
use OpenTelemetry\API\Trace\TracerInterface;
use OpenTelemetry\Context\Context;
use OpenTelemetry\Context\Propagation\MultiTextMapPropagator;
use OpenTelemetry\SDK\Trace\Span;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Stamp\BusNameStamp;

/**
* Be aware if you start a span before this subscriber, it could leads to orphan span issue.
* Be sure your span is properly ended.
*/
class WorkerMessageEventSubscriber implements EventSubscriberInterface
{
private ?InstrumentationTypeEnum $instrumentationType = null;

public function __construct(
private readonly MultiTextMapPropagator $propagator,
private readonly TracerInterface $tracer,
private readonly LoggerInterface $logger,
) {
}

public function setInstrumentationType(InstrumentationTypeEnum $instrumentationType): void
{
$this->instrumentationType = $instrumentationType;
}

public static function getSubscribedEvents()
{
return [
WorkerMessageReceivedEvent::class => ['startSpan'],
WorkerMessageFailedEvent::class => ['endSpanOnError'],
WorkerMessageHandledEvent::class => ['endSpanWithSuccess'],
];
}

public function startSpan(WorkerMessageReceivedEvent $event): void
{
if (InstrumentationTypeEnum::Auto !== $this->instrumentationType) {
return;
}

// ensure propagation from incoming trace
$context = $this->propagator->extract($event->getEnvelope(), new TraceStampPropagator($this->logger));

$scope = Context::storage()->scope();

if (null !== $scope) {
$this->logger->debug(sprintf('Using scope "%s"', spl_object_id($scope)));
} else {
$this->logger->debug('No active scope');
}

$span = $this->tracer
->spanBuilder($event->getReceiverName())
->setParent($context)
->setSpanKind(SpanKind::KIND_CONSUMER)
->startSpan();

$busNameStamp = $event->getEnvelope()->last(BusNameStamp::class);

if (null !== $busNameStamp) {
$span->setAttribute('bus.name', $busNameStamp->getBusName());
}

$this->logger->debug(sprintf('Starting span "%s"', $span->getContext()->getSpanId()));

Context::storage()
->attach(
$span->storeInContext($context)
)
;
}

public function endSpanWithSuccess(WorkerMessageHandledEvent $event): void
{
if (InstrumentationTypeEnum::Auto !== $this->instrumentationType) {
return;
}

$scope = Context::storage()->scope();

if (null === $scope) {
return;
}

$scope->detach();

$span = Span::fromContext($scope->context());
$span->setStatus(StatusCode::STATUS_OK);
$this->logger->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId()));
$span->end();
}

public function endSpanOnError(WorkerMessageFailedEvent $event): void
{
if (InstrumentationTypeEnum::Auto !== $this->instrumentationType) {
return;
}

$scope = Context::storage()->scope();

if (null === $scope) {
return;
}

$scope->detach();

$span = Span::fromContext($scope->context());
$span->setStatus(StatusCode::STATUS_ERROR);
$span->setAttribute('exception.message', $event->getThrowable()->getMessage());
$previous = $event->getThrowable()->getPrevious();

if (null !== $previous) {
$span->setAttribute('exception.previous.message', $previous->getMessage());
}

$this->logger->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId()));
$span->end();
}
}
21 changes: 21 additions & 0 deletions src/OpenTelemetry/Context/Propagator/PropagatorFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Context\Propagator;

use OpenTelemetry\API\Baggage\Propagation\BaggagePropagator;
use OpenTelemetry\API\Trace\Propagation\TraceContextPropagator;
use OpenTelemetry\Context\Propagation\MultiTextMapPropagator;

class PropagatorFactory
{
/**
* Default propagators from https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration.
*/
public static function createDefault(): MultiTextMapPropagator
{
return new MultiTextMapPropagator([
BaggagePropagator::getInstance(),
TraceContextPropagator::getInstance(),
]);
}
}
63 changes: 63 additions & 0 deletions src/OpenTelemetry/Context/Propagator/TraceStampPropagator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Context\Propagator;

use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger\TraceStamp;
use OpenTelemetry\API\Trace\Propagation\TraceContextPropagator;
use OpenTelemetry\Context\Propagation\PropagationGetterInterface;
use OpenTelemetry\Context\Propagation\PropagationSetterInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;

readonly class TraceStampPropagator implements PropagationSetterInterface, PropagationGetterInterface
{
public function __construct(
private ?LoggerInterface $logger = null,
) {
}

public function set(&$carrier, string $key, string $value): void
{
if (!$carrier instanceof Envelope) {
throw new \InvalidArgumentException(sprintf('The carrier for trace stamp propagation must be instance of %s', Envelope::class));
}

if (TraceContextPropagator::TRACEPARENT !== $key) {
return;
}

$carrier = $carrier->with(new TraceStamp($value));
$this->logger?->debug("Trace stamp added to envelope for propagation with value: $value");
}

public function keys($carrier): array
{
if (!$carrier instanceof Envelope) {
throw new \InvalidArgumentException(sprintf('The carrier for trace stamp propagation must be instance of %s', Envelope::class));
}

return [TraceContextPropagator::TRACEPARENT];
}

public function get($carrier, string $key): ?string
{
if (!$carrier instanceof Envelope) {
throw new \InvalidArgumentException(sprintf('The carrier for trace stamp propagation must be instance of %s', Envelope::class));
}

if (TraceContextPropagator::TRACEPARENT !== $key) {
return null;
}

$traceStamp = $carrier->last(TraceStamp::class);

if (null === $traceStamp) {
return null;
}

$traceParent = $traceStamp->getTraceParent();
$this->logger?->debug("Get trace parent from TraceStamp with value: $traceParent");

return $traceParent;
}
}
2 changes: 2 additions & 0 deletions src/Resources/config/services.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php

use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Context\Propagator\HeadersPropagator as HeadersPropagationGetter;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Context\Propagator\PropagatorFactory;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterDsn;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\OtlpExporterOptions;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Resource\ResourceInfoFactory;
Expand Down Expand Up @@ -33,6 +34,7 @@

->set('open_telemetry.propagator_text_map.noop', NoopTextMapPropagator::class)
->set('open_telemetry.propagator_text_map.multi', MultiTextMapPropagator::class)
->factory([PropagatorFactory::class, 'createDefault'])

->set('open_telemetry.propagation_getter.headers', HeadersPropagationGetter::class)
->set('open_telemetry.propagation_getter.sanitize_combined_headers', SanitizeCombinedHeadersPropagationGetter::class)
Expand Down
Loading