Skip to content

Support adding services while ServiceGroup is running #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 118 additions & 26 deletions Sources/ServiceLifecycle/ServiceGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import Logging
import UnixSignals
import AsyncAlgorithms

/// A ``ServiceGroup`` is responsible for running a number of services, setting up signal handling and signalling graceful shutdown to the services.
public actor ServiceGroup: Sendable, Service {
Expand All @@ -23,7 +24,8 @@ public actor ServiceGroup: Sendable, Service {
case initial(services: [ServiceGroupConfiguration.ServiceConfiguration])
/// The state once ``ServiceGroup/run()`` has been called.
case running(
gracefulShutdownStreamContinuation: AsyncStream<Void>.Continuation
gracefulShutdownStreamContinuation: AsyncStream<Void>.Continuation,
addedServiceChannel: AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>
)
/// The state once ``ServiceGroup/run()`` has finished.
case finished
Expand Down Expand Up @@ -106,6 +108,37 @@ public actor ServiceGroup: Sendable, Service {
self.maximumCancellationDuration = configuration._maximumCancellationDuration
}

/// Adds a service to the group.
///
/// If the group is currently running, the added service will be started immediately.
/// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started.
/// - Parameters:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the late review - it'd be great to call out that this can also be used for adding services before the group was started. Right now the comment leaves that ambiguous of what happens.

/// - serviceConfiguration: The service configuration to add.
public func addService(_ serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration) async {
switch self.state {
case var .initial(services: services):
self.state = .initial(services: [])
services.append(serviceConfiguration)
self.state = .initial(services: services)

case .running(_, let addedServiceChannel):
await addedServiceChannel.send(serviceConfiguration)

case .finished:
return
}
}

/// Adds a service to the group.
///
/// If the group is currently running, the added service will be started immediately.
/// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started.
/// - Parameters:
/// - service: The service to add.
public func addService(_ service: any Service) async {
await self.addService(ServiceGroupConfiguration.ServiceConfiguration(service: service))
}

/// Runs all the services by spinning up a child task per service.
/// Furthermore, this method sets up the correct signal handlers
/// for graceful shutdown.
Expand All @@ -128,16 +161,19 @@ public actor ServiceGroup: Sendable, Service {
}

let (gracefulShutdownStream, gracefulShutdownContinuation) = AsyncStream.makeStream(of: Void.self)
let addedServiceChannel = AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>()

self.state = .running(
gracefulShutdownStreamContinuation: gracefulShutdownContinuation
gracefulShutdownStreamContinuation: gracefulShutdownContinuation,
addedServiceChannel: addedServiceChannel
)

var potentialError: Error?
do {
try await self._run(
services: &services,
gracefulShutdownStream: gracefulShutdownStream
gracefulShutdownStream: gracefulShutdownStream,
addedServiceChannel: addedServiceChannel
)
} catch {
potentialError = error
Expand Down Expand Up @@ -173,7 +209,7 @@ public actor ServiceGroup: Sendable, Service {
self.state = .finished
return

case .running(let gracefulShutdownStreamContinuation):
case .running(let gracefulShutdownStreamContinuation, _):
// We cannot transition to shuttingDown here since we are signalling over to the task
// that runs `run`. This task is responsible for transitioning to shuttingDown since
// there might be multiple signals racing to trigger it
Expand All @@ -189,7 +225,7 @@ public actor ServiceGroup: Sendable, Service {
}
}

private enum ChildTaskResult {
fileprivate enum ChildTaskResult {
case serviceFinished(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int)
case serviceThrew(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int, error: any Error)
case signalCaught(UnixSignal)
Expand All @@ -202,7 +238,8 @@ public actor ServiceGroup: Sendable, Service {

private func _run(
services: inout [ServiceGroupConfiguration.ServiceConfiguration],
gracefulShutdownStream: AsyncStream<Void>
gracefulShutdownStream: AsyncStream<Void>,
addedServiceChannel: AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>
) async throws {
self.logger.debug(
"Starting service lifecycle",
Expand Down Expand Up @@ -280,25 +317,11 @@ public actor ServiceGroup: Sendable, Service {
let gracefulShutdownManager = GracefulShutdownManager()
gracefulShutdownManagers.append(gracefulShutdownManager)

// This must be addTask and not addTaskUnlessCancelled
// because we must run all the services for the below logic to work.
group.addTask {
return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) {
do {
try await serviceConfiguration.service.run()
return .serviceFinished(service: serviceConfiguration, index: index)
} catch {
return .serviceThrew(service: serviceConfiguration, index: index, error: error)
}
}
}
}

group.addTask {
// This child task is waiting forever until the group gets cancelled.
let (stream, _) = AsyncStream.makeStream(of: Void.self)
await stream.first { _ in true }
return .cancellationCaught
group.addServiceTask(
serviceConfiguration,
gracefulShutdownManager: gracefulShutdownManager,
index: index
)
}

// We are storing the services in an optional array now. When a slot in the array is
Expand All @@ -310,6 +333,49 @@ public actor ServiceGroup: Sendable, Service {
"We did not create a graceful shutdown manager per service"
)

var _unownedTaskGroupHandledCarefully = group
group.addTask {
// This is the task that listens to added services and starts them while the group is running

await withTaskCancellationHandler {
// Channel will be finished in `shutdownGracefully`, we must not add services after graceful shutdown has started
for await serviceConfiguration in addedServiceChannel {
self.logger.debug(
"Starting added service",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(serviceConfiguration.service)"
]
)

let gracefulShutdownManager = GracefulShutdownManager()
gracefulShutdownManagers.append(gracefulShutdownManager)
services.append(serviceConfiguration)

precondition(
services.count == gracefulShutdownManagers.count,
"Mismatch between services and graceful shutdown managers"
)

_unownedTaskGroupHandledCarefully.addServiceTask(
serviceConfiguration,
gracefulShutdownManager: gracefulShutdownManager,
index: services.count - 1
)
}
} onCancel: {
addedServiceChannel.finish()
}

return .gracefulShutdownFinished
}

group.addTask {
// This child task is waiting forever until the group gets cancelled.
let (stream, _) = AsyncStream.makeStream(of: Void.self)
await stream.first { _ in true }
return .cancellationCaught
}

// We are going to wait for any of the services to finish or
// the signal sequence to throw an error.
while !group.isEmpty {
Expand Down Expand Up @@ -530,10 +596,13 @@ public actor ServiceGroup: Sendable, Service {
group: inout ThrowingTaskGroup<ChildTaskResult, Error>,
gracefulShutdownManagers: [GracefulShutdownManager]
) async throws {
guard case .running = self.state else {
guard case let .running(_, addedServiceChannel) = self.state else {
fatalError("Unexpected state")
}

// Signal to stop adding new services (it is important that no new services are added after this point)
addedServiceChannel.finish()

if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *),
let maximumGracefulShutdownDuration = self.maximumGracefulShutdownDuration
{
Expand Down Expand Up @@ -779,6 +848,29 @@ public actor ServiceGroup: Sendable, Service {
}
}

extension ThrowingTaskGroup where Failure == Error, ChildTaskResult == ServiceGroup.ChildTaskResult {
mutating func addServiceTask(
_ serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration,
gracefulShutdownManager: GracefulShutdownManager,
index: Int
) {
// This must be addTask and not addTaskUnlessCancelled
// because we must run all the services for the shutdown logic to work.
self.addTask {
return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) {
do {
try await serviceConfiguration.service.run()
return .serviceFinished(service: serviceConfiguration, index: index)
} catch {
return .serviceThrew(service: serviceConfiguration, index: index, error: error)
}
}
}

}

}

// This should be removed once we support Swift 5.9+
extension AsyncStream {
fileprivate static func makeStream(
Expand Down
71 changes: 71 additions & 0 deletions Tests/ServiceLifecycleTests/MockService.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import ServiceLifecycle

actor MockService: Service, CustomStringConvertible {
enum Event {
case run
case runPing
case runCancelled
case shutdownGracefully
}

let events: AsyncStream<Event>
internal private(set) var hasRun: Bool = false

private let eventsContinuation: AsyncStream<Event>.Continuation

private var runContinuation: CheckedContinuation<Void, Error>?

nonisolated let description: String

private let pings: AsyncStream<Void>
private nonisolated let pingContinuation: AsyncStream<Void>.Continuation

init(
description: String
) {
var eventsContinuation: AsyncStream<Event>.Continuation!
self.events = AsyncStream<Event> { eventsContinuation = $0 }
self.eventsContinuation = eventsContinuation!

var pingContinuation: AsyncStream<Void>.Continuation!
self.pings = AsyncStream<Void> { pingContinuation = $0 }
self.pingContinuation = pingContinuation!

self.description = description
}

func run() async throws {
self.hasRun = true

try await withTaskCancellationHandler {
try await withGracefulShutdownHandler {
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
self.eventsContinuation.yield(.run)
for await _ in self.pings {
self.eventsContinuation.yield(.runPing)
}
}

try await withCheckedThrowingContinuation {
self.runContinuation = $0
}

group.cancelAll()
}
} onGracefulShutdown: {
self.eventsContinuation.yield(.shutdownGracefully)
}
} onCancel: {
self.eventsContinuation.yield(.runCancelled)
}
}

func resumeRunContinuation(with result: Result<Void, Error>) {
self.runContinuation?.resume(with: result)
}

nonisolated func sendPing() {
self.pingContinuation.yield()
}
}
Loading