improved disconnection logic (hopefully)
This commit is contained in:
@@ -40,11 +40,10 @@ actor GatewayClient {
|
||||
self.hbTask = Task.detached { [self] in
|
||||
do {
|
||||
try await Task.sleep(for: .milliseconds(Int.random(in: 0...helloMessage.heartbeat_interval)))
|
||||
try await sendHeartbeat()
|
||||
|
||||
while !Task.isCancelled {
|
||||
try await Task.sleep(for: .milliseconds(helloMessage.heartbeat_interval))
|
||||
try await sendHeartbeat()
|
||||
try await Task.sleep(for: .milliseconds(helloMessage.heartbeat_interval))
|
||||
}
|
||||
} catch {
|
||||
print("Heartbeat task canceled")
|
||||
@@ -75,36 +74,54 @@ actor GatewayClient {
|
||||
var gwMessage: GatewayMessage? = nil
|
||||
let json = JSONDecoder()
|
||||
while gwMessage == nil {
|
||||
var wsMessage: URLSessionWebSocketTask.Message? = nil
|
||||
do {
|
||||
let wsMessage = try await ws.receive()
|
||||
#if DEBUG
|
||||
//print(wsMessage)
|
||||
#endif
|
||||
guard case .string(let str) = wsMessage else { throw GatewayError.invalidMessage }
|
||||
strBuffer.append(str)
|
||||
gwMessage = try json.decode(GatewayMessage.self, from: Data(strBuffer.utf8))
|
||||
} catch URLError.networkConnectionLost {
|
||||
self.open = false
|
||||
wsMessage = try await ws.receive()
|
||||
} catch {
|
||||
print("Error listening to gateway: \(error)")
|
||||
try await reconnect()
|
||||
}
|
||||
|
||||
guard let wsMessage = wsMessage else { continue }
|
||||
#if DEBUG
|
||||
//print(wsMessage)
|
||||
#endif
|
||||
guard case .string(let str) = wsMessage else { throw GatewayError.invalidMessage }
|
||||
strBuffer.append(str)
|
||||
do {
|
||||
gwMessage = try json.decode(GatewayMessage.self, from: Data(strBuffer.utf8))
|
||||
} catch DecodingError.dataCorrupted {
|
||||
continue
|
||||
}
|
||||
}
|
||||
guard let gwMessage = gwMessage else { throw GatewayError.invalidMessage }
|
||||
sequenceNum = gwMessage.s ?? sequenceNum
|
||||
if gwMessage.d == .heartbeatAck { hbAck = true }
|
||||
return gwMessage
|
||||
}
|
||||
|
||||
private func reconnect() async throws {
|
||||
open = false
|
||||
ws.cancel()
|
||||
hbTask?.cancel()
|
||||
do {
|
||||
try await attemptResume()
|
||||
} catch {
|
||||
print(error)
|
||||
ws.cancel()
|
||||
let queryItems = [URLQueryItem(name: "v", value: "10"), URLQueryItem(name: "encoding", value: "json")]
|
||||
ws = URLSession.shared.webSocketTask(with: gatewayURL.appending(queryItems: queryItems))
|
||||
try await openConnection()
|
||||
print("Error resuming session: \(error)")
|
||||
while (!open) {
|
||||
open = false
|
||||
ws.cancel()
|
||||
hbTask?.cancel()
|
||||
let queryItems = [URLQueryItem(name: "v", value: "10"), URLQueryItem(name: "encoding", value: "json")]
|
||||
ws = URLSession.shared.webSocketTask(with: gatewayURL.appending(queryItems: queryItems))
|
||||
do {
|
||||
try await openConnection()
|
||||
} catch {
|
||||
print("Error reconnecting: \(error)")
|
||||
}
|
||||
if (open) { break }
|
||||
try await Task.sleep(for: .seconds(5))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,10 +158,16 @@ actor GatewayClient {
|
||||
self.open = true
|
||||
}
|
||||
|
||||
var hbAck = false
|
||||
private func sendHeartbeat() async throws {
|
||||
guard self.open else { print("conn closed, skipping hb"); return }
|
||||
hbAck = false
|
||||
let hbMessage = "{\"op\":1,\"d\":\(sequenceNum == nil ? "null" : String(sequenceNum!))}"
|
||||
try await ws.send(.string(hbMessage))
|
||||
try await Task.sleep(for: .seconds(5))
|
||||
if !hbAck {
|
||||
ws.cancel(with: .normalClosure, reason: nil)
|
||||
}
|
||||
}
|
||||
|
||||
var events: AsyncStream<GatewayPayload> {
|
||||
|
||||
Reference in New Issue
Block a user