diff --git a/Libraries/WebSocket/RCTSRWebSocket.m b/Libraries/WebSocket/RCTSRWebSocket.m index 7febefecfdee64..96de0d396b28bb 100644 --- a/Libraries/WebSocket/RCTSRWebSocket.m +++ b/Libraries/WebSocket/RCTSRWebSocket.m @@ -217,6 +217,8 @@ @implementation RCTSRWebSocket int _closeCode; BOOL _isPumping; + + BOOL _cleanupScheduled; NSMutableSet *_scheduledRunloops; @@ -324,17 +326,11 @@ - (void)dealloc [_inputStream close]; [_outputStream close]; - - _workQueue = NULL; - + if (_receivedHTTPHeaders) { CFRelease(_receivedHTTPHeaders); _receivedHTTPHeaders = NULL; } - - if (_delegateDispatchQueue) { - _delegateDispatchQueue = NULL; - } } #ifndef NDEBUG @@ -626,11 +622,11 @@ - (void)_failWithError:(NSError *)error; }]; self.readyState = RCTSR_CLOSED; - self->_selfRetain = nil; - + RCTSRLog(@"Failing with error %@", error.localizedDescription); [self _disconnect]; + [self _scheduleCleanup]; } }); } @@ -1036,12 +1032,7 @@ - (void)_pumpWriting; !_sentClose) { _sentClose = YES; - [_outputStream close]; - [_inputStream close]; - - for (NSArray *runLoop in [_scheduledRunloops copy]) { - [self unscheduleFromRunLoop:runLoop[0] forMode:runLoop[1]]; - } + [self _scheduleCleanup]; if (!_failed) { [self _performDelegateBlock:^{ @@ -1050,8 +1041,6 @@ - (void)_pumpWriting; } }]; } - - _selfRetain = nil; } } @@ -1345,94 +1334,142 @@ - (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode; } } } - - assert(_workQueue != NULL); + + // _workQueue cannot be NULL + if (!_workQueue) { + return; + } + __weak typeof(self) weakSelf = self; dispatch_async(_workQueue, ^{ - switch (eventCode) { - case NSStreamEventOpenCompleted: { - RCTSRLog(@"NSStreamEventOpenCompleted %@", aStream); - if (self.readyState >= RCTSR_CLOSING) { - return; - } - assert(self->_readBuffer); - - if (self.readyState == RCTSR_CONNECTING && aStream == self->_inputStream) { - [self didConnect]; - } - [self _pumpWriting]; - [self _pumpScanner]; - break; - } - - case NSStreamEventErrorOccurred: { - RCTSRLog(@"NSStreamEventErrorOccurred %@ %@", aStream, [aStream.streamError copy]); - // TODO: specify error better! - [self _failWithError:aStream.streamError]; - self->_readBufferOffset = 0; - self->_readBuffer.length = 0; - break; + typeof(self) strongSelf = weakSelf; + if (!strongSelf) { + return; + } + [strongSelf safeHandleEvent:eventCode stream:aStream]; + }); +} +- (void)safeHandleEvent:(NSStreamEvent)eventCode stream:(NSStream *)aStream +{ + switch (eventCode) { + case NSStreamEventOpenCompleted: { + RCTSRLog(@"NSStreamEventOpenCompleted %@", aStream); + if (self.readyState >= RCTSR_CLOSING) { + return; } - - case NSStreamEventEndEncountered: { - [self _pumpScanner]; - RCTSRLog(@"NSStreamEventEndEncountered %@", aStream); - if (aStream.streamError) { - [self _failWithError:aStream.streamError]; - } else { - dispatch_async(self->_workQueue, ^{ - if (self.readyState != RCTSR_CLOSED) { - self.readyState = RCTSR_CLOSED; - self->_selfRetain = nil; - } - - if (!self->_sentClose && !self->_failed) { - self->_sentClose = YES; - // If we get closed in this state it's probably not clean because we should be sending this when we send messages - [self _performDelegateBlock:^{ - if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) { - [self.delegate webSocket:self didCloseWithCode:RCTSRStatusCodeGoingAway reason:@"Stream end encountered" wasClean:NO]; - } - }]; - } - }); - } - - break; + assert(self->_readBuffer); + + if (self.readyState == RCTSR_CONNECTING && aStream == self->_inputStream) { + [self didConnect]; } - - case NSStreamEventHasBytesAvailable: { - RCTSRLog(@"NSStreamEventHasBytesAvailable %@", aStream); - const int bufferSize = 2048; - uint8_t buffer[bufferSize]; - - while (self->_inputStream.hasBytesAvailable) { - NSInteger bytes_read = [self->_inputStream read:buffer maxLength:bufferSize]; - - if (bytes_read > 0) { - [self->_readBuffer appendBytes:buffer length:bytes_read]; - } else if (bytes_read < 0) { - [self _failWithError:self->_inputStream.streamError]; + [self _pumpWriting]; + [self _pumpScanner]; + break; + } + + case NSStreamEventErrorOccurred: { + RCTSRLog(@"NSStreamEventErrorOccurred %@ %@", aStream, [aStream.streamError copy]); + // TODO: specify error better! + [self _failWithError:aStream.streamError]; + self->_readBufferOffset = 0; + self->_readBuffer.length = 0; + break; + + } + + case NSStreamEventEndEncountered: { + [self _pumpScanner]; + RCTSRLog(@"NSStreamEventEndEncountered %@", aStream); + if (aStream.streamError) { + [self _failWithError:aStream.streamError]; + } else { + dispatch_async(self->_workQueue, ^{ + if (self.readyState != RCTSR_CLOSED) { + self.readyState = RCTSR_CLOSED; + [self _scheduleCleanup]; } - - if (bytes_read != bufferSize) { - break; + + if (!self->_sentClose && !self->_failed) { + self->_sentClose = YES; + // If we get closed in this state it's probably not clean because we should be sending this when we send messages + [self _performDelegateBlock:^{ + if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) { + [self.delegate webSocket:self didCloseWithCode:RCTSRStatusCodeGoingAway reason:@"Stream end encountered" wasClean:NO]; + } + }]; } - }; - [self _pumpScanner]; - break; + }); } + + break; + } + + case NSStreamEventHasBytesAvailable: { + RCTSRLog(@"NSStreamEventHasBytesAvailable %@", aStream); + const int bufferSize = 2048; + uint8_t buffer[bufferSize]; + + while (self->_inputStream.hasBytesAvailable) { + NSInteger bytes_read = [self->_inputStream read:buffer maxLength:bufferSize]; + + if (bytes_read > 0) { + [self->_readBuffer appendBytes:buffer length:bytes_read]; + } else if (bytes_read < 0) { + [self _failWithError:self->_inputStream.streamError]; + } + + if (bytes_read != bufferSize) { + break; + } + }; + [self _pumpScanner]; + break; + } + + case NSStreamEventHasSpaceAvailable: { + RCTSRLog(@"NSStreamEventHasSpaceAvailable %@", aStream); + [self _pumpWriting]; + break; + } + + default: + RCTSRLog(@"(default) %@", aStream); + break; + } +} - case NSStreamEventHasSpaceAvailable: { - RCTSRLog(@"NSStreamEventHasSpaceAvailable %@", aStream); - [self _pumpWriting]; - break; - } +- (void)_scheduleCleanup +{ + if (_cleanupScheduled) { + return; + } + + _cleanupScheduled = YES; + + // Cleanup NSStream's delegate in the same RunLoop used by the streams themselves: + // This way we'll prevent race conditions between handleEvent and SRWebsocket's dealloc + NSTimer *timer = [NSTimer timerWithTimeInterval:(0.0f) target:self selector:@selector(_cleanupSelfReference:) userInfo:nil repeats:NO]; + [[NSRunLoop RCTSR_networkRunLoop] addTimer:timer forMode:NSDefaultRunLoopMode]; +} - default: - RCTSRLog(@"(default) %@", aStream); - break; - } +- (void)_cleanupSelfReference:(NSTimer *)timer +{ + // Remove the streams, right now, from the networkRunLoop + [_inputStream close]; + [_outputStream close]; + + // Unschedule from RunLoop + for (NSArray *runLoop in [_scheduledRunloops copy]) { + [self unscheduleFromRunLoop:runLoop[0] forMode:runLoop[1]]; + } + + // Nuke NSStream's delegate + _inputStream.delegate = nil; + _outputStream.delegate = nil; + + // Cleanup selfRetain in the same GCD queue as usual + dispatch_async(_workQueue, ^{ + self->_selfRetain = nil; }); }