Iterating over net socket messages with async / await in Swift – Donny Wals


Printed on: January 24, 2023

In iOS 13, we gained the power to simply ship and obtain information utilizing net sockets by way of URLSession. With async/await, we gained the power to fetch information from servers utilizing the await key phrase and we will iterate over asynchronous sequences utilizing async for loops.

We are able to even learn information from a URL one line at a time by calling the traces property on URL:

let url = URL(string: "https://donnywals.com")!

for attempt await line in url.traces {
    // use line
}

Whereas that is actually cool and permits us to construct apps that ingest information in actual time if the server helps streaming our bodies, we can’t use the traces property to arrange an internet socket connection and hear for incoming messages and probably ship messages over the identical connection too.

On this put up, you’ll study every part you should learn about constructing your personal mechanism to conveniently iterate over messages from an internet socket asynchronously. We are going to leverage some present performance from URLSessionWebSocketTask and AsyncThrowingStream to construct our personal AsyncSequence that conveniently wraps our URLSessionWebSocketTask.

Notice that the ensuing code has solely had comparatively restricted testing performed so I can’t assure that the supplied answer will probably be 100% right for every part you throw at it. In case you discover any points with the ultimate code, be at liberty to contact me. Bonus factors should you’re capable of present some concepts for a possible repair.

Utilizing an internet socket with out async / await

Earlier than we get began, let’s shortly assessment find out how to use an internet socket with out async/await. The code particulars are outlined in this put up. Remember to learn it if you wish to study extra about utilizing net sockets in your apps.


let url = URL(string: "ws://127.0.0.1:8080")!
let socketConnection = URLSession.shared.webSocketTask(with: url)
socketConnection.resume()

func setReceiveHandler() {
    socketConnection.obtain { end in
        defer { self.setReceiveHandler() }

        do {
            let message = attempt outcome.get()
            change message {
            case let .string(string):
                print(string)
            case let .information(information):
                print(information)
            @unknown default:
                print("unkown message obtained")
            }
        } catch {
            // deal with the error
            print(error)
        }
    }
}

setReceiveHandler()

Discover how, to obtain messages from the socket, I have to name obtain with a completion handler. This methodology solely permits me to obtain a single incoming message, so I have to re-set my handler after receiving a message to routinely start listening for the following message.

It is a nice instance of a state of affairs the place an async for loop akin to for attempt await message in socketConnection would make numerous sense. Sadly, this isn’t potential out of the field. Nevertheless, URLSessionWebSocketTask gives some type of help for async / await so we’re not completely out of luck.

A fundamental implementation of net sockets with async / await

Whereas URLSessionWebSocketTask doesn’t expose an AsyncSequence that emits incoming messages out of the field, it does include an async model of the obtain methodology you noticed earlier.

This enables us to rewrite the instance above as an async methodology as follows:

func setReceiveHandler() async {
    do {
        let message = attempt await socketConnection.obtain()

        change message {
        case let .string(string):
          print(string)
        case let .information(information):
          print(information)
        @unknown default:
          print("unkown message obtained")
        }
    } catch {
        print(error)
    }

    await setReceiveHandler()
}

This code works simply tremendous, besides we don’t actually have a way to cease the recursion right here. The code you noticed earlier really has the very same challenge; there’s no situation to cease listening for net socket messages even when the net socket connection has already been closed.

See also  ios - Hyperlink react_native_tracking_transparency (arm64) 0.2 seconds * Undefined image: _RCTRegisterModule

We might enhance our code by solely recursing if:

  1. We didn’t encounter any errors
  2. The socket connection remains to be energetic

This might look a bit as follows:

func setReceiveHandler() async {
    guard socketConnection.closeCode == .invalid else {
        return
    }

    do {
        let message = attempt await socketConnection.obtain()

        change message {
        case let .string(string):
          print(string)
        case let .information(information):
          print(information)
        @unknown default:
          print("unkown message obtained")
        }

        await setReceiveHandler()
    } catch {
        print(error)
    }
}

An open net socket’s closed code is at all times mentioned to invalid to sign that the connection has not (but) been closed. We are able to leverage this to test that our connection remains to be energetic earlier than ready for the following message to be obtained.

That is a lot better already as a result of we respect closed sockets and failures a lot nicer now, however we might enhance the readability of this code a tiny bit by leveraging a whereas loop as an alternative of recursively calling the setReceiveHandler perform:

func setReceiveHandler() async {
    var isActive = true

    whereas isActive && socketConnection.closeCode == .invalid {
        do {
            let message = attempt await socketConnection.obtain()

            change message {
            case let .string(string):
              print(string)
            case let .information(information):
              print(information)
            @unknown default:
              print("unkown message obtained")
            }
        } catch {
            print(error)
            isActive = false
        }
    }
}

To me, this model of the code is barely simpler to learn however that may not be the case for you. It’s functionally equal so you’ll be able to select to make use of whichever choice fits you greatest.

Whereas this code works, I’m not fairly pleased with the place we’ve landed proper now. There’s numerous logic on this perform and I would like to separate dealing with the incoming values from the calls to socketConnection.obtain() in some way. Ideally, I ought to have the ability to write the next:

do {
    for attempt await message in socketConnection {
        change message {
        case let .string(string):
            print(string)
        case let .information(information):
            print(information)
        @unknown default:
            print("unkown message obtained")
      }
} catch {
    // deal with error
}

That is a lot, a lot nicer from a call-site perspective and it could enable us to place the ugly bits elsewhere.

To do that, we will leverage the ability of AsyncStream which permits us to construct a customized async sequence of values.

Utilizing AsyncStream to emit net socket messages

Given our finish objective, there are just a few methods for us to get the place we wish to be. The best means could be to jot down a perform in an extension on URLSessionWebSocketTask that might encapsulate the whereas loop you noticed earlier. This implementation would look as follows:

typealias WebSocketStream = AsyncThrowingStream<URLSessionWebSocketTask.Message, Error>

public extension URLSessionWebSocketTask {    
    var stream: WebSocketStream {
        return WebSocketStream { continuation in
            Process {
                var isAlive = true

                whereas isAlive && closeCode == .invalid {
                    do {
                        let worth = attempt await obtain()
                        continuation.yield(worth)
                    } catch {
                        continuation.end(throwing: error)
                        isAlive = false
                    }
                }
            }
        }
    }
}

To make the code a bit bit simpler to learn, I’ve outlined a typealias for my AsyncThrowingStream so we don’t have to take a look at the identical lengthy sort signature all over.

See also  File add API server in Vapor 4

The code above creates an occasion of AsyncThrowingStream that asynchronously awaits new values from the net socket so long as the net socket is taken into account energetic and hasn’t been closed. To emit incoming messages and potential errors, the continuation’s yield and end strategies are used. These strategies will both emit a brand new worth (yield) or finish the stream of values with an error (end).

This code works nice in lots of conditions, however there may be one challenge. If we resolve to shut the net socket connection from the app’s aspect by calling cancel(with:purpose:) on our socketConnection, our WebSocketStream doesn’t finish. As an alternative, it is going to be caught ready for messages, and the decision website will probably be caught too.

Process {
    attempt await Process.sleep(for: .seconds(5))
    attempt await socketConnection.cancel(with: .goingAway, purpose: nil)
}

Process {    
    do {
        for attempt await message in socketConnection.stream2 {
            // deal with incoming messages
        }
    } catch {
        // deal with error
    }

    print("this might by no means be printed")
}

If every part works as anticipated, our net socket connection will shut after 5 seconds. At that time, our for loop ought to finish and our print assertion ought to execute, because the asynchronous stream is now not energetic. Sadly, this isn’t the case, so we have to discover a higher technique to mannequin our stream.

URLSessionWebSocketTask doesn’t present a means for us to detect cancellation. So, I’ve discovered that it’s best to make use of an object that wraps the URLSessionWebSocketTask, and to cancel the duty by way of that object. This enables us to each finish the async stream we’re offering to callers and shut the net socket reference to one methodology name.

Right here’s what that object appears like:

class SocketStream: AsyncSequence {
    typealias AsyncIterator = WebSocketStream.Iterator
    typealias Ingredient = URLSessionWebSocketTask.Message

    personal var continuation: WebSocketStream.Continuation?
    personal let process: URLSessionWebSocketTask

    personal lazy var stream: WebSocketStream = {
        return WebSocketStream { continuation in
            self.continuation = continuation

            Process {
                var isAlive = true

                whereas isAlive && process.closeCode == .invalid {
                    do {
                        let worth = attempt await process.obtain()
                        continuation.yield(worth)
                    } catch {
                        continuation.end(throwing: error)
                        isAlive = false
                    }
                }
            }
        }
    }()

    init(process: URLSessionWebSocketTask) {
        self.process = process
        process.resume()
    }

    deinit {
        continuation?.end()
    }

    func makeAsyncIterator() -> AsyncIterator {
        return stream.makeAsyncIterator()
    }

    func cancel() async throws {
        process.cancel(with: .goingAway, purpose: nil)
        continuation?.end()
    }
}

There’s a bunch of code right here, nevertheless it’s not too unhealthy. The primary few traces are all about organising some sort aliases and properties for comfort. The lazy var stream is basically the very same code that you just’ve already within the URLSessionWebSocketTask extension from earlier than.

When our SocketStream‘s deinit known as we guarantee that we finish our stream. There’s additionally a cancel methodology that closes the socket connection in addition to the stream. As a result of SocketStream conforms to AsyncSequence we should present an Iterator object that’s used once we attempt to iterate over our SocketStreams. We merely ask our inner stream object to make an iterator and use that as our return worth.

See also  iOS Construct Failed problem with Ionic 6, Cordova 11 & Xcode 13

Utilizing the code above appears as follows:

let url = URL(string: "ws://127.0.0.1:8080")!
let socketConnection = URLSession.shared.webSocketTask(with: url)
let stream = SocketStream(process: socketConnection)

Process {  
    do {
        for attempt await message in stream {
            // deal with incoming messages
        }
    } catch {
        // deal with error
    }

    print("this will probably be printed as soon as the stream ends")
}

To cancel our stream after 5 seconds similar to earlier than, you’ll be able to run the next process in parallel with our iterating process:

Process {
    attempt await Process.sleep(for: .seconds(5))
    attempt await stream.cancel()
}

Process {
    // iterate...
}

Whereas that is fairly cool, we do have a little bit of a problem right here due to the next little bit of code:

personal lazy var stream: WebSocketStream = {
    return WebSocketStream { continuation in
        self.continuation = continuation

        Process {
            var isAlive = true

            whereas isAlive && process.closeCode == .invalid {
                do {
                    let worth = attempt await process.obtain()
                    continuation.yield(worth)
                } catch {
                    continuation.end(throwing: error)
                    isAlive = false
                }
            }
        }
    }
}()

The duty that we run our whereas loop in gained’t finish except we finish our stream from inside our catch block. If we manually shut the net socket connection utilizing the cancel methodology we write earlier, the decision to obtain() won’t ever obtain an error nor a price which signifies that it is going to be caught eternally.

Probably the most dependable technique to repair that is to return to the callback based mostly model of obtain to drive your async stream:

personal lazy var stream: WebSocketStream = {
    return WebSocketStream { continuation in
        self.continuation = continuation
        waitForNextValue()
    }
}()

personal func waitForNextValue() {
    guard process.closeCode == .invalid else {
        continuation?.end()
        return
    }

    process.obtain(completionHandler: { [weak self] end in
        guard let continuation = self?.continuation else {
            return
        }

        do {
            let message = attempt outcome.get()
            continuation.yield(message)
            self?.waitForNextValue()
        } catch {
            continuation.end(throwing: error)
        }
    })
}

With this method we don’t have any lingering duties, and our name website is as clear and concise as ever; we’ve solely modified a few of our inner logic.

In Abstract

Swift Concurrency gives many helpful options for writing higher code, and Apple shortly adopted async / await for present APIs. Nevertheless, some APIs that might be helpful are lacking, akin to iterating over net socket messages.

On this put up, you realized find out how to use async streams to create an async sequence that emits net socket messages. You first noticed a totally async / await model that was neat, however had reminiscence and process lifecycle points. Then, you noticed a model that mixes a callback-based method with the async stream.

The result’s a simple technique to iterate over incoming net socket messages with async / await. In case you have any questions, feedback, or enhancements for this put up, please do not hesitate to achieve out to me on Twitter.

Leave a Reply