{-# LANGUAGE NoImplicitPrelude , RecordWildCards , TemplateHaskell , OverloadedStrings , FlexibleContexts , ViewPatterns , TypeFamilies , DeriveGeneric , DeriveDataTypeable , QuasiQuotes #-} module Jobs ( module Jobs.Types , writeJobCtl , queueJob , handleJobs ) where import Import import Jobs.Types import Data.Conduit.TMChan import qualified Data.Conduit.List as C import Data.Aeson (fromJSON, toJSON) import qualified Data.Aeson as Aeson import Database.Persist.Sql (executeQQ, fromSqlKey) data JobQueueException = JInvalid QueuedJobId QueuedJob | JLocked QueuedJobId InstanceId UTCTime | JNonexistant QueuedJobId deriving (Read, Show, Eq, Generic, Typeable) instance Exception JobQueueException handleJobs :: UniWorX -> IO () -- | Read control commands from `appJobCtl` and address them as they come in -- -- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders. -- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ... handleJobs foundation@UniWorX{..} = unsafeHandler foundation . bracket_ logStart logStop . runConduit $ sourceTMChan appJobCtl .| handleJobs' where logStart = $(logDebugS) "Jobs" "Started" logStop = $(logDebugS) "Jobs" "Shutting down" handleJobs' :: Sink JobCtl Handler () handleJobs' = C.mapM_ $ void . handleAny ($(logErrorS) "Jobs" . tshow) . handleCmd where handleQueueException :: MonadLogger m => JobQueueException -> m () handleQueueException (JInvalid jId j) = $(logWarnS) "Jobs" $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j handleQueueException (JNonexistant jId) = $(logInfoS) "Jobs" $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) handleQueueException (JLocked jId lInstance lTime) = $(logDebugS) "Jobs" $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) handleCmd JobCtlFlush = void . fork . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform) handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \QueuedJob{..} -> do let content :: Job Aeson.Success content = fromJSON queuedJobContent $(logDebugS) "Jobs" $ "Would do: " <> tshow content -- FIXME runDB $ delete jId jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a jLocked jId act = do hasLock <- liftIO $ newTVarIO False val <- runDB $ do [executeQQ| SET TRANSACTION ISOLATION LEVEL SERIALIZABLE |] j@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId maybe (return ()) throwM $ JLocked <$> pure jId <*> queuedJobLockInstance <*> queuedJobLockTime case fromJSON queuedJobContent :: Aeson.Result Job of Aeson.Success _ -> return () Aeson.Error t -> do $logErrorS "Jobs" $ "Aeson decoding error: " <> pack t throwM $ JInvalid jId j instanceID <- getsYesod appInstanceID now <- liftIO getCurrentTime val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID , QueuedJobLockTime =. Just now ] liftIO . atomically $ writeTVar hasLock True return val act val `finally` whenM (liftIO . atomically $ readTVar hasLock) jUnlock where jUnlock :: Handler () jUnlock = runDB $ update jId [ QueuedJobLockInstance =. Nothing , QueuedJobLockTime =. Nothing ] writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m () writeJobCtl cmd = do chan <- getsYesod appJobCtl liftIO . atomically $ writeTMChan chan cmd queueJob :: Job -> YesodDB UniWorX QueuedJobId queueJob job = do now <- liftIO getCurrentTime self <- getsYesod appInstanceID jId <- insert QueuedJob { queuedJobContent = toJSON job , queuedJobCreationInstance = self , queuedJobCreationTime = now , queuedJobLockInstance = Nothing , queuedJobLockTime = Nothing } writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something) return jId