blob: 88883a2a02752011d863972e2632bef3c8c45583 [file] [log] [blame]
/*
*
* Copyright (c) 2020-2021 Project CHIP Authors
* Copyright (c) 2014-2017 Nest Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* @file
* This file implements Layer using select().
*/
#include <lib/support/CodeUtils.h>
#include <lib/support/TimeUtils.h>
#include <platform/LockTracker.h>
#include <system/SystemFaultInjection.h>
#include <system/SystemLayer.h>
#include <system/SystemLayerImplSelect.h>
#include <errno.h>
// Choose an approximation of PTHREAD_NULL if pthread.h doesn't define one.
#if CHIP_SYSTEM_CONFIG_POSIX_LOCKING && !defined(PTHREAD_NULL)
#define PTHREAD_NULL 0
#endif // CHIP_SYSTEM_CONFIG_POSIX_LOCKING && !defined(PTHREAD_NULL)
namespace chip {
namespace System {
constexpr Clock::Seconds64 kDefaultMinSleepPeriod = Clock::Seconds64(60 * 60 * 24 * 30); // Month [sec]
CHIP_ERROR LayerImplSelect::Init()
{
VerifyOrReturnError(mLayerState.SetInitializing(), CHIP_ERROR_INCORRECT_STATE);
RegisterPOSIXErrorFormatter();
for (auto & w : mSocketWatchPool)
{
w.Clear();
}
#if CHIP_SYSTEM_CONFIG_POSIX_LOCKING
mHandleSelectThread = PTHREAD_NULL;
#endif // CHIP_SYSTEM_CONFIG_POSIX_LOCKING
// Create an event to allow an arbitrary thread to wake the thread in the select loop.
ReturnErrorOnFailure(mWakeEvent.Open(*this));
VerifyOrReturnError(mLayerState.SetInitialized(), CHIP_ERROR_INCORRECT_STATE);
return CHIP_NO_ERROR;
}
void LayerImplSelect::Shutdown()
{
VerifyOrReturn(mLayerState.SetShuttingDown());
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
TimerList::Node * timer;
while ((timer = mTimerList.PopEarliest()) != nullptr)
{
if (timer->mTimerSource != nullptr)
{
dispatch_source_cancel(timer->mTimerSource);
dispatch_release(timer->mTimerSource);
}
}
mTimerPool.ReleaseAll();
for (auto & w : mSocketWatchPool)
{
w.DisableAndClear();
}
#else // CHIP_SYSTEM_CONFIG_USE_DISPATCH
mTimerList.Clear();
mTimerPool.ReleaseAll();
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
mWakeEvent.Close(*this);
mLayerState.ResetFromShuttingDown(); // Return to uninitialized state to permit re-initialization.
}
void LayerImplSelect::Signal()
{
/*
* Wake up the I/O thread by writing a single byte to the wake pipe.
*
* If this is being called from within an I/O event callback, then writing to the wake pipe can be skipped,
* since the I/O thread is already awake.
*
* Furthermore, we don't care if this write fails as the only reasonably likely failure is that the pipe is full, in which
* case the select calling thread is going to wake up anyway.
*/
#if CHIP_SYSTEM_CONFIG_POSIX_LOCKING
if (pthread_equal(mHandleSelectThread, pthread_self()))
{
return;
}
#endif // CHIP_SYSTEM_CONFIG_POSIX_LOCKING
// Send notification to wake up the select call.
CHIP_ERROR status = mWakeEvent.Notify();
if (status != CHIP_NO_ERROR)
{
ChipLogError(chipSystemLayer, "System wake event notify failed: %" CHIP_ERROR_FORMAT, status.Format());
}
}
CHIP_ERROR LayerImplSelect::StartTimer(Clock::Timeout delay, TimerCompleteCallback onComplete, void * appState)
{
VerifyOrReturnError(mLayerState.IsInitialized(), CHIP_ERROR_INCORRECT_STATE);
CHIP_SYSTEM_FAULT_INJECT(FaultInjection::kFault_TimeoutImmediate, delay = System::Clock::kZero);
CancelTimer(onComplete, appState);
TimerList::Node * timer = mTimerPool.Create(*this, SystemClock().GetMonotonicTimestamp() + delay, onComplete, appState);
VerifyOrReturnError(timer != nullptr, CHIP_ERROR_NO_MEMORY);
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
dispatch_queue_t dispatchQueue = GetDispatchQueue();
if (dispatchQueue)
{
(void) mTimerList.Add(timer);
dispatch_source_t timerSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, dispatchQueue);
if (timerSource == nullptr)
{
chipDie();
}
timer->mTimerSource = timerSource;
dispatch_source_set_timer(
timerSource, dispatch_walltime(nullptr, static_cast<int64_t>(Clock::Milliseconds64(delay).count() * NSEC_PER_MSEC)),
DISPATCH_TIME_FOREVER, 2 * NSEC_PER_MSEC);
dispatch_source_set_event_handler(timerSource, ^{
dispatch_source_cancel(timerSource);
dispatch_release(timerSource);
this->HandleTimerComplete(timer);
});
dispatch_resume(timerSource);
return CHIP_NO_ERROR;
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (mTimerList.Add(timer) == timer)
{
// The new timer is the earliest, so the time until the next event has probably changed.
Signal();
}
return CHIP_NO_ERROR;
}
void LayerImplSelect::CancelTimer(TimerCompleteCallback onComplete, void * appState)
{
VerifyOrReturn(mLayerState.IsInitialized());
TimerList::Node * timer = mTimerList.Remove(onComplete, appState);
if (timer == nullptr)
{
// The timer was not in our "will fire in the future" list, but it might
// be in the "we're about to fire these" chunk we already grabbed from
// that list. Check for it there too, and if found there we still want
// to cancel it.
timer = mExpiredTimers.Remove(onComplete, appState);
}
VerifyOrReturn(timer != nullptr);
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (timer->mTimerSource != nullptr)
{
dispatch_source_cancel(timer->mTimerSource);
dispatch_release(timer->mTimerSource);
}
#endif
mTimerPool.Release(timer);
Signal();
}
CHIP_ERROR LayerImplSelect::ScheduleWork(TimerCompleteCallback onComplete, void * appState)
{
VerifyOrReturnError(mLayerState.IsInitialized(), CHIP_ERROR_INCORRECT_STATE);
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
dispatch_queue_t dispatchQueue = GetDispatchQueue();
if (dispatchQueue)
{
dispatch_async(dispatchQueue, ^{
onComplete(this, appState);
});
return CHIP_NO_ERROR;
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
// Ideally we would not use a timer here at all, but if we try to just
// ScheduleLambda the lambda needs to capture the following:
// 1) onComplete
// 2) appState
// 3) The `this` pointer, because onComplete needs to be passed a pointer to
// the System::Layer.
//
// On a 64-bit system that's 24 bytes, but lambdas passed to ScheduleLambda
// are capped at CHIP_CONFIG_LAMBDA_EVENT_SIZE which is 16 bytes.
//
// So for now use a timer as a poor-man's closure that captures `this` and
// onComplete and appState in a single pointer, so we fit inside the size
// limit.
//
// TODO: We could do something here where we compile-time condition on the
// sizes of things and use a direct ScheduleLambda if it would fit and this
// setup otherwise.
//
// TODO: But also, unit tests seem to do SystemLayer::ScheduleWork without
// actually running a useful event loop (in the PlatformManager sense),
// which breaks if we use ScheduleLambda here, since that does rely on the
// PlatformManager event loop. So for now, keep scheduling an expires-ASAP
// timer, but just make sure we don't cancel existing timers with the same
// callback and appState, so ScheduleWork invocations don't stomp on each
// other.
TimerList::Node * timer = mTimerPool.Create(*this, SystemClock().GetMonotonicTimestamp(), onComplete, appState);
VerifyOrReturnError(timer != nullptr, CHIP_ERROR_NO_MEMORY);
if (mTimerList.Add(timer) == timer)
{
// The new timer is the earliest, so the time until the next event has probably changed.
Signal();
}
return CHIP_NO_ERROR;
}
CHIP_ERROR LayerImplSelect::StartWatchingSocket(int fd, SocketWatchToken * tokenOut)
{
// Find a free slot.
SocketWatch * watch = nullptr;
for (auto & w : mSocketWatchPool)
{
if (w.mFD == fd)
{
// Duplicate registration is an error.
return CHIP_ERROR_INVALID_ARGUMENT;
}
if ((w.mFD == kInvalidFd) && (watch == nullptr))
{
watch = &w;
}
}
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_ENDPOINT_POOL_FULL);
watch->mFD = fd;
*tokenOut = reinterpret_cast<SocketWatchToken>(watch);
return CHIP_NO_ERROR;
}
CHIP_ERROR LayerImplSelect::SetCallback(SocketWatchToken token, SocketWatchCallback callback, intptr_t data)
{
SocketWatch * watch = reinterpret_cast<SocketWatch *>(token);
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
watch->mCallback = callback;
watch->mCallbackData = data;
return CHIP_NO_ERROR;
}
CHIP_ERROR LayerImplSelect::RequestCallbackOnPendingRead(SocketWatchToken token)
{
SocketWatch * watch = reinterpret_cast<SocketWatch *>(token);
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
watch->mPendingIO.Set(SocketEventFlags::kRead);
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (watch->mRdSource == nullptr)
{
// First time requesting callback for read events: install a dispatch source
dispatch_queue_t dispatchQueue = GetDispatchQueue();
if (dispatchQueue == nullptr)
{
// Note: if no dispatch queue is available, callbacks most probably will not work,
// unless, as in some tests from a test-specific local loop,
// the select based event handling (Prepare/WaitFor/HandleEvents) is invoked.
ChipLogError(DeviceLayer,
"RequestCallbackOnPendingRead with no dispatch queue: callback may not work (might be ok in tests)");
}
else
{
watch->mRdSource =
dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, static_cast<uintptr_t>(watch->mFD), 0, dispatchQueue);
ReturnErrorCodeIf(watch->mRdSource == nullptr, CHIP_ERROR_NO_MEMORY);
dispatch_source_set_event_handler(watch->mRdSource, ^{
if (watch->mPendingIO.Has(SocketEventFlags::kRead) && watch->mCallback != nullptr)
{
SocketEvents events;
events.Set(SocketEventFlags::kRead);
watch->mCallback(events, watch->mCallbackData);
}
});
// only now we are sure the source exists and can become active
dispatch_activate(watch->mRdSource);
}
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
return CHIP_NO_ERROR;
}
CHIP_ERROR LayerImplSelect::RequestCallbackOnPendingWrite(SocketWatchToken token)
{
SocketWatch * watch = reinterpret_cast<SocketWatch *>(token);
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
watch->mPendingIO.Set(SocketEventFlags::kWrite);
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (watch->mWrSource == nullptr)
{
// First time requesting callback for read events: install a dispatch source
dispatch_queue_t dispatchQueue = GetDispatchQueue();
if (dispatchQueue == nullptr)
{
// Note: if no dispatch queue is available, callbacks most probably will not work,
// unless, as in some tests from a test-specific local loop,
// the select based event handling (Prepare/WaitFor/HandleEvents) is invoked.
ChipLogError(DeviceLayer,
"RequestCallbackOnPendingWrite with no dispatch queue: callback may not work (might be ok in tests)");
}
else
{
watch->mWrSource =
dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, static_cast<uintptr_t>(watch->mFD), 0, dispatchQueue);
ReturnErrorCodeIf(watch->mWrSource == nullptr, CHIP_ERROR_NO_MEMORY);
dispatch_source_set_event_handler(watch->mWrSource, ^{
if (watch->mPendingIO.Has(SocketEventFlags::kWrite) && watch->mCallback != nullptr)
{
SocketEvents events;
events.Set(SocketEventFlags::kWrite);
watch->mCallback(events, watch->mCallbackData);
}
});
// only now we are sure the source exists and can become active
watch->mPendingIO.Set(SocketEventFlags::kWrite);
dispatch_activate(watch->mWrSource);
}
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
return CHIP_NO_ERROR;
}
CHIP_ERROR LayerImplSelect::ClearCallbackOnPendingRead(SocketWatchToken token)
{
SocketWatch * watch = reinterpret_cast<SocketWatch *>(token);
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
watch->mPendingIO.Clear(SocketEventFlags::kRead);
return CHIP_NO_ERROR;
}
CHIP_ERROR LayerImplSelect::ClearCallbackOnPendingWrite(SocketWatchToken token)
{
SocketWatch * watch = reinterpret_cast<SocketWatch *>(token);
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
watch->mPendingIO.Clear(SocketEventFlags::kWrite);
return CHIP_NO_ERROR;
}
CHIP_ERROR LayerImplSelect::StopWatchingSocket(SocketWatchToken * tokenInOut)
{
SocketWatch * watch = reinterpret_cast<SocketWatch *>(*tokenInOut);
*tokenInOut = InvalidSocketWatchToken();
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
VerifyOrReturnError(watch->mFD >= 0, CHIP_ERROR_INCORRECT_STATE);
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
watch->DisableAndClear();
#else
watch->Clear();
// Wake the thread calling select so that it stops selecting on the socket.
Signal();
#endif
return CHIP_NO_ERROR;
}
/**
* Set the read, write or exception bit flags for the specified socket based on its status in
* the corresponding file descriptor sets.
*
* @param[in] socket The file descriptor for which the bit flags are being set.
*
* @param[in] readfds A pointer to the set of readable file descriptors.
*
* @param[in] writefds A pointer to the set of writable file descriptors.
*
* @param[in] exceptfds A pointer to the set of file descriptors with errors.
*/
SocketEvents LayerImplSelect::SocketEventsFromFDs(int socket, const fd_set & readfds, const fd_set & writefds,
const fd_set & exceptfds)
{
SocketEvents res;
if (socket >= 0)
{
// POSIX does not define the fd_set parameter of FD_ISSET() as const, even though it isn't modified.
if (FD_ISSET(socket, const_cast<fd_set *>(&readfds)))
res.Set(SocketEventFlags::kRead);
if (FD_ISSET(socket, const_cast<fd_set *>(&writefds)))
res.Set(SocketEventFlags::kWrite);
if (FD_ISSET(socket, const_cast<fd_set *>(&exceptfds)))
res.Set(SocketEventFlags::kExcept);
}
return res;
}
void LayerImplSelect::PrepareEvents()
{
assertChipStackLockedByCurrentThread();
const Clock::Timestamp currentTime = SystemClock().GetMonotonicTimestamp();
Clock::Timestamp awakenTime = currentTime + kDefaultMinSleepPeriod;
TimerList::Node * timer = mTimerList.Earliest();
if (timer && timer->AwakenTime() < awakenTime)
{
awakenTime = timer->AwakenTime();
}
const Clock::Timestamp sleepTime = (awakenTime > currentTime) ? (awakenTime - currentTime) : Clock::kZero;
Clock::ToTimeval(sleepTime, mNextTimeout);
mMaxFd = -1;
// NOLINTBEGIN(clang-analyzer-security.insecureAPI.bzero)
//
// NOTE: darwin uses bzero to clear out FD sets. This is not a security concern.
FD_ZERO(&mSelected.mReadSet);
FD_ZERO(&mSelected.mWriteSet);
FD_ZERO(&mSelected.mErrorSet);
// NOLINTEND(clang-analyzer-security.insecureAPI.bzero)
for (auto & w : mSocketWatchPool)
{
if (w.mFD != kInvalidFd)
{
if (mMaxFd < w.mFD)
{
mMaxFd = w.mFD;
}
if (w.mPendingIO.Has(SocketEventFlags::kRead))
{
FD_SET(w.mFD, &mSelected.mReadSet);
}
if (w.mPendingIO.Has(SocketEventFlags::kWrite))
{
FD_SET(w.mFD, &mSelected.mWriteSet);
}
}
}
}
void LayerImplSelect::WaitForEvents()
{
mSelectResult = select(mMaxFd + 1, &mSelected.mReadSet, &mSelected.mWriteSet, &mSelected.mErrorSet, &mNextTimeout);
}
void LayerImplSelect::HandleEvents()
{
assertChipStackLockedByCurrentThread();
if (!IsSelectResultValid())
{
ChipLogError(DeviceLayer, "Select failed: %" CHIP_ERROR_FORMAT, CHIP_ERROR_POSIX(errno).Format());
return;
}
#if CHIP_SYSTEM_CONFIG_POSIX_LOCKING
mHandleSelectThread = pthread_self();
#endif // CHIP_SYSTEM_CONFIG_POSIX_LOCKING
// Obtain the list of currently expired timers. Any new timers added by timer callback are NOT handled on this pass,
// since that could result in infinite handling of new timers blocking any other progress.
VerifyOrDieWithMsg(mExpiredTimers.Empty(), DeviceLayer, "Re-entry into HandleEvents from a timer callback?");
mExpiredTimers = mTimerList.ExtractEarlier(Clock::Timeout(1) + SystemClock().GetMonotonicTimestamp());
TimerList::Node * timer = nullptr;
while ((timer = mExpiredTimers.PopEarliest()) != nullptr)
{
mTimerPool.Invoke(timer);
}
for (auto & w : mSocketWatchPool)
{
if (w.mFD != kInvalidFd)
{
SocketEvents events = SocketEventsFromFDs(w.mFD, mSelected.mReadSet, mSelected.mWriteSet, mSelected.mErrorSet);
if (events.HasAny() && w.mCallback != nullptr)
{
w.mCallback(events, w.mCallbackData);
}
}
}
#if CHIP_SYSTEM_CONFIG_POSIX_LOCKING
mHandleSelectThread = PTHREAD_NULL;
#endif // CHIP_SYSTEM_CONFIG_POSIX_LOCKING
}
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
void LayerImplSelect::HandleTimerComplete(TimerList::Node * timer)
{
mTimerList.Remove(timer);
mTimerPool.Invoke(timer);
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
void LayerImplSelect::SocketWatch::Clear()
{
mFD = kInvalidFd;
mPendingIO.ClearAll();
mCallback = nullptr;
mCallbackData = 0;
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
mRdSource = nullptr;
mWrSource = nullptr;
#endif
}
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
void LayerImplSelect::SocketWatch::DisableAndClear()
{
if (mRdSource)
{
dispatch_source_cancel(mRdSource);
dispatch_release(mRdSource);
}
if (mWrSource)
{
dispatch_source_cancel(mWrSource);
dispatch_release(mWrSource);
}
Clear();
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
} // namespace System
} // namespace chip