Skip to content

Support adding services while ServiceGroup is running #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 137 additions & 25 deletions Sources/ServiceLifecycle/ServiceGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import Logging
import UnixSignals
import AsyncAlgorithms

/// A ``ServiceGroup`` is responsible for running a number of services, setting up signal handling and signalling graceful shutdown to the services.
public actor ServiceGroup: Sendable, Service {
Expand All @@ -23,7 +24,8 @@ public actor ServiceGroup: Sendable, Service {
case initial(services: [ServiceGroupConfiguration.ServiceConfiguration])
/// The state once ``ServiceGroup/run()`` has been called.
case running(
gracefulShutdownStreamContinuation: AsyncStream<Void>.Continuation
gracefulShutdownStreamContinuation: AsyncStream<Void>.Continuation,
addedServiceChannel: AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>
)
/// The state once ``ServiceGroup/run()`` has finished.
case finished
Expand Down Expand Up @@ -106,6 +108,38 @@ public actor ServiceGroup: Sendable, Service {
self.maximumCancellationDuration = configuration._maximumCancellationDuration
}

/// Adds a new service to the group.
///
/// If the group is currently running, the added service will be started immediately.
/// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started.
/// - Parameters:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the late review - it'd be great to call out that this can also be used for adding services before the group was started. Right now the comment leaves that ambiguous of what happens.

/// - serviceConfiguration: The service configuration to add.
public func addServiceUnlessShutdown(_ serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration) async {
switch self.state {
case var .initial(services: services):
self.state = .initial(services: [])
services.append(serviceConfiguration)
self.state = .initial(services: services)

case .running(_, let addedServiceChannel):
await addedServiceChannel.send(serviceConfiguration)

case .finished:
// Since this is a best effort operation we don't have to do anything here
return
}
}

/// Adds a new service to the group.
///
/// If the group is currently running, the added service will be started immediately.
/// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started.
/// - Parameters:
/// - service: The service to add.
public func addServiceUnlessShutdown(_ service: any Service) async {
await self.addServiceUnlessShutdown(ServiceGroupConfiguration.ServiceConfiguration(service: service))
}

/// Runs all the services by spinning up a child task per service.
/// Furthermore, this method sets up the correct signal handlers
/// for graceful shutdown.
Expand All @@ -128,16 +162,19 @@ public actor ServiceGroup: Sendable, Service {
}

let (gracefulShutdownStream, gracefulShutdownContinuation) = AsyncStream.makeStream(of: Void.self)
let addedServiceChannel = AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>()

self.state = .running(
gracefulShutdownStreamContinuation: gracefulShutdownContinuation
gracefulShutdownStreamContinuation: gracefulShutdownContinuation,
addedServiceChannel: addedServiceChannel
)

var potentialError: Error?
do {
try await self._run(
services: &services,
gracefulShutdownStream: gracefulShutdownStream
gracefulShutdownStream: gracefulShutdownStream,
addedServiceChannel: addedServiceChannel
)
} catch {
potentialError = error
Expand Down Expand Up @@ -173,7 +210,7 @@ public actor ServiceGroup: Sendable, Service {
self.state = .finished
return

case .running(let gracefulShutdownStreamContinuation):
case .running(let gracefulShutdownStreamContinuation, _):
// We cannot transition to shuttingDown here since we are signalling over to the task
// that runs `run`. This task is responsible for transitioning to shuttingDown since
// there might be multiple signals racing to trigger it
Expand All @@ -198,11 +235,13 @@ public actor ServiceGroup: Sendable, Service {
case gracefulShutdownFinished
case gracefulShutdownTimedOut
case cancellationCaught
case newServiceAdded(ServiceGroupConfiguration.ServiceConfiguration)
}

private func _run(
services: inout [ServiceGroupConfiguration.ServiceConfiguration],
gracefulShutdownStream: AsyncStream<Void>
gracefulShutdownStream: AsyncStream<Void>,
addedServiceChannel: AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>
) async throws {
self.logger.debug(
"Starting service lifecycle",
Expand Down Expand Up @@ -280,25 +319,12 @@ public actor ServiceGroup: Sendable, Service {
let gracefulShutdownManager = GracefulShutdownManager()
gracefulShutdownManagers.append(gracefulShutdownManager)

// This must be addTask and not addTaskUnlessCancelled
// because we must run all the services for the below logic to work.
group.addTask {
return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) {
do {
try await serviceConfiguration.service.run()
return .serviceFinished(service: serviceConfiguration, index: index)
} catch {
return .serviceThrew(service: serviceConfiguration, index: index, error: error)
}
}
}
}

group.addTask {
// This child task is waiting forever until the group gets cancelled.
let (stream, _) = AsyncStream.makeStream(of: Void.self)
await stream.first { _ in true }
return .cancellationCaught
self.addServiceTask(
group: &group,
service: serviceConfiguration,
gracefulShutdownManager: gracefulShutdownManager,
index: index
)
}

// We are storing the services in an optional array now. When a slot in the array is
Expand All @@ -310,12 +336,52 @@ public actor ServiceGroup: Sendable, Service {
"We did not create a graceful shutdown manager per service"
)

group.addTask {
// This child task is waiting forever until the group gets cancelled.
let (stream, _) = AsyncStream.makeStream(of: Void.self)
await stream.first { _ in true }
return .cancellationCaught
}

// Adds a task that listens to added services and funnels them into the task group
self.addAddedServiceListenerTask(group: &group, channel: addedServiceChannel)

// We are going to wait for any of the services to finish or
// the signal sequence to throw an error.
while !group.isEmpty {
let result: ChildTaskResult? = try await group.next()

switch result {
case .newServiceAdded(let serviceConfiguration):
self.logger.debug(
"Starting added service",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(serviceConfiguration.service)"
]
)

let gracefulShutdownManager = GracefulShutdownManager()
gracefulShutdownManagers.append(gracefulShutdownManager)
services.append(serviceConfiguration)

precondition(
services.count == gracefulShutdownManagers.count,
"Mismatch between services and graceful shutdown managers"
)

self.addServiceTask(
group: &group,
service: serviceConfiguration,
gracefulShutdownManager: gracefulShutdownManager,
index: services.count - 1
)

// Each listener task can only handle a single added service, so we must add a new listener
self.addAddedServiceListenerTask(
group: &group,
channel: addedServiceChannel
)

case .serviceFinished(let service, let index):
if group.isCancelled {
// The group is cancelled and we expect all services to finish
Expand Down Expand Up @@ -530,10 +596,13 @@ public actor ServiceGroup: Sendable, Service {
group: inout ThrowingTaskGroup<ChildTaskResult, Error>,
gracefulShutdownManagers: [GracefulShutdownManager]
) async throws {
guard case .running = self.state else {
guard case let .running(_, addedServiceChannel) = self.state else {
fatalError("Unexpected state")
}

// Signal to stop adding new services (it is important that no new services are added after this point)
addedServiceChannel.finish()

if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *),
let maximumGracefulShutdownDuration = self.maximumGracefulShutdownDuration
{
Expand Down Expand Up @@ -717,6 +786,10 @@ public actor ServiceGroup: Sendable, Service {
// We are going to continue the result loop since we have to wait for our service
// to finish.
break

case .newServiceAdded:
// Since adding services is best effort, we simply ignore this
break
}
}
}
Expand Down Expand Up @@ -777,6 +850,45 @@ public actor ServiceGroup: Sendable, Service {
cancellationTimeoutTask = nil
}
}

private func addServiceTask(
group: inout ThrowingTaskGroup<ChildTaskResult, Error>,
service serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration,
gracefulShutdownManager: GracefulShutdownManager,
index: Int
) {
// This must be addTask and not addTaskUnlessCancelled
// because we must run all the services for the shutdown logic to work.
group.addTask {
return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) {
do {
try await serviceConfiguration.service.run()
return .serviceFinished(service: serviceConfiguration, index: index)
} catch {
return .serviceThrew(service: serviceConfiguration, index: index, error: error)
}
}
}
}

private func addAddedServiceListenerTask(
group: inout ThrowingTaskGroup<ChildTaskResult, Error>,
channel: AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>
) {
group.addTask {
return await withTaskCancellationHandler {
var iterator = channel.makeAsyncIterator()
if let addedService = await iterator.next() {
return .newServiceAdded(addedService)
}

return .gracefulShutdownFinished
} onCancel: {
// Without this we can get stuck in `addService` if the group
channel.finish()
}
}
}
}

// This should be removed once we support Swift 5.9+
Expand Down
71 changes: 71 additions & 0 deletions Tests/ServiceLifecycleTests/MockService.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import ServiceLifecycle

actor MockService: Service, CustomStringConvertible {
enum Event {
case run
case runPing
case runCancelled
case shutdownGracefully
}

let events: AsyncStream<Event>
internal private(set) var hasRun: Bool = false

private let eventsContinuation: AsyncStream<Event>.Continuation

private var runContinuation: CheckedContinuation<Void, Error>?

nonisolated let description: String

private let pings: AsyncStream<Void>
private nonisolated let pingContinuation: AsyncStream<Void>.Continuation

init(
description: String
) {
var eventsContinuation: AsyncStream<Event>.Continuation!
self.events = AsyncStream<Event> { eventsContinuation = $0 }
self.eventsContinuation = eventsContinuation!

var pingContinuation: AsyncStream<Void>.Continuation!
self.pings = AsyncStream<Void> { pingContinuation = $0 }
self.pingContinuation = pingContinuation!

self.description = description
}

func run() async throws {
self.hasRun = true

try await withTaskCancellationHandler {
try await withGracefulShutdownHandler {
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
self.eventsContinuation.yield(.run)
for await _ in self.pings {
self.eventsContinuation.yield(.runPing)
}
}

try await withCheckedThrowingContinuation {
self.runContinuation = $0
}

group.cancelAll()
}
} onGracefulShutdown: {
self.eventsContinuation.yield(.shutdownGracefully)
}
} onCancel: {
self.eventsContinuation.yield(.runCancelled)
}
}

func resumeRunContinuation(with result: Result<Void, Error>) {
self.runContinuation?.resume(with: result)
}

nonisolated func sendPing() {
self.pingContinuation.yield()
}
}
Loading