diff --git a/Package.resolved b/Package.resolved index cbcceba..aab3006 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,15 +1,6 @@ { "originHash" : "4a778111861cbfa51bf1ddb4a59475b19b79249afde443a29b8c9edd66d656a7", "pins" : [ - { - "identity" : "swift-dotenv", - "kind" : "remoteSourceControl", - "location" : "https://github.com/thebarndog/swift-dotenv.git", - "state" : { - "revision" : "d13062940a3cdb2b13efa22ecc83b07b21c2e8dc", - "version" : "2.1.0" - } - }, { "identity" : "xmlcoder", "kind" : "remoteSourceControl", diff --git a/Sources/DiscordKit/GatewayClient.swift b/Sources/DiscordKit/GatewayClient.swift index e0b2895..ee1c5b3 100644 --- a/Sources/DiscordKit/GatewayClient.swift +++ b/Sources/DiscordKit/GatewayClient.swift @@ -40,11 +40,10 @@ actor GatewayClient { self.hbTask = Task.detached { [self] in do { try await Task.sleep(for: .milliseconds(Int.random(in: 0...helloMessage.heartbeat_interval))) - try await sendHeartbeat() while !Task.isCancelled { - try await Task.sleep(for: .milliseconds(helloMessage.heartbeat_interval)) try await sendHeartbeat() + try await Task.sleep(for: .milliseconds(helloMessage.heartbeat_interval)) } } catch { print("Heartbeat task canceled") @@ -75,36 +74,54 @@ actor GatewayClient { var gwMessage: GatewayMessage? = nil let json = JSONDecoder() while gwMessage == nil { + var wsMessage: URLSessionWebSocketTask.Message? = nil do { - let wsMessage = try await ws.receive() -#if DEBUG - //print(wsMessage) -#endif - guard case .string(let str) = wsMessage else { throw GatewayError.invalidMessage } - strBuffer.append(str) - gwMessage = try json.decode(GatewayMessage.self, from: Data(strBuffer.utf8)) - } catch URLError.networkConnectionLost { - self.open = false + wsMessage = try await ws.receive() + } catch { + print("Error listening to gateway: \(error)") try await reconnect() + } + + guard let wsMessage = wsMessage else { continue } +#if DEBUG + //print(wsMessage) +#endif + guard case .string(let str) = wsMessage else { throw GatewayError.invalidMessage } + strBuffer.append(str) + do { + gwMessage = try json.decode(GatewayMessage.self, from: Data(strBuffer.utf8)) } catch DecodingError.dataCorrupted { continue } } guard let gwMessage = gwMessage else { throw GatewayError.invalidMessage } sequenceNum = gwMessage.s ?? sequenceNum + if gwMessage.d == .heartbeatAck { hbAck = true } return gwMessage } private func reconnect() async throws { + open = false + ws.cancel() hbTask?.cancel() do { try await attemptResume() } catch { - print(error) - ws.cancel() - let queryItems = [URLQueryItem(name: "v", value: "10"), URLQueryItem(name: "encoding", value: "json")] - ws = URLSession.shared.webSocketTask(with: gatewayURL.appending(queryItems: queryItems)) - try await openConnection() + print("Error resuming session: \(error)") + while (!open) { + open = false + ws.cancel() + hbTask?.cancel() + let queryItems = [URLQueryItem(name: "v", value: "10"), URLQueryItem(name: "encoding", value: "json")] + ws = URLSession.shared.webSocketTask(with: gatewayURL.appending(queryItems: queryItems)) + do { + try await openConnection() + } catch { + print("Error reconnecting: \(error)") + } + if (open) { break } + try await Task.sleep(for: .seconds(5)) + } } } @@ -141,10 +158,16 @@ actor GatewayClient { self.open = true } + var hbAck = false private func sendHeartbeat() async throws { guard self.open else { print("conn closed, skipping hb"); return } + hbAck = false let hbMessage = "{\"op\":1,\"d\":\(sequenceNum == nil ? "null" : String(sequenceNum!))}" try await ws.send(.string(hbMessage)) + try await Task.sleep(for: .seconds(5)) + if !hbAck { + ws.cancel(with: .normalClosure, reason: nil) + } } var events: AsyncStream { diff --git a/Sources/DiscordKit/Models.swift b/Sources/DiscordKit/Models.swift index 761c4e1..1299e2a 100644 --- a/Sources/DiscordKit/Models.swift +++ b/Sources/DiscordKit/Models.swift @@ -47,10 +47,11 @@ public struct SessionStartLimit: Codable, Sendable { public let max_concurrency: Int } -public enum GatewayPayload: Decodable, Sendable { +public enum GatewayPayload: Decodable, Sendable, Equatable { case hello(GatewayHello) case messageCreate(MessageCreate) case ready(GatewayReady) + case heartbeatAck } public struct GatewayMessage: Decodable, Sendable { @@ -72,6 +73,8 @@ public struct GatewayMessage: Decodable, Sendable { s = try container.decode(Int?.self, forKey: .s) t = try container.decode(String?.self, forKey: .t) switch op { + case 11: + d = .heartbeatAck case 10: let hello = try container.decode(GatewayHello.self, forKey: .d) d = .hello(hello) @@ -95,12 +98,12 @@ public struct GatewayMessage: Decodable, Sendable { } } -public struct GatewayReady: Codable, Sendable { +public struct GatewayReady: Codable, Sendable, Equatable { public let session_id: String public let resume_gateway_url: URL } -public struct GatewayHello: Codable, Sendable { +public struct GatewayHello: Codable, Sendable, Equatable { public let heartbeat_interval: Int } @@ -172,7 +175,7 @@ public struct MessageRefrence: Codable, Sendable { let guild_id: String? } -public struct MessageCreate: Codable, Sendable { +public struct MessageCreate: Codable, Sendable, Equatable { public let id: String public let channel_id: String public let guild_id: String? @@ -183,19 +186,19 @@ public struct MessageCreate: Codable, Sendable { } -public struct User: Codable, Sendable { +public struct User: Codable, Sendable, Equatable { public let id: String? public let bot: Bool? public let global_name: String? public let username: String } -public struct GuildMember: Codable, Sendable { +public struct GuildMember: Codable, Sendable, Equatable { public let user: User? public let nick: String? } -public struct MentionUser: Codable, Sendable { +public struct MentionUser: Codable, Sendable, Equatable { public let id: String? public let bot: Bool? public let global_name: String?