diff --git a/config/settings.yml b/config/settings.yml index 3437b0a5a..d8b8b0330 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -159,8 +159,13 @@ upload-cache: auto-discover-region: "_env:UPLOAD_S3_AUTO_DISCOVER_REGION:true" disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false" upload-cache-bucket: "uni2work-uploads" -inject-files: 10 -file-upload-db-chunksize: 1048576 # 1MiB + +inject-files: 307 +rechunk-files: 601 + +file-upload-db-chunksize: 4194304 # 4MiB +file-chunking-target-exponent: 21 # 2MiB +file-chunking-hash-window: 4096 server-sessions: idle-timeout: 28807 @@ -231,6 +236,9 @@ token-buckets: depth: 1572864000 # 1500MiB inv-rate: 1.9e-6 # 2MiB/s initial-value: 0 - + rechunk-files: + depth: 20971520 # 20MiB + inv-rate: 9.5e-7 # 1MiB/s + initial-value: 0 fallback-personalised-sheet-files-keys-expire: 2419200 diff --git a/package.yaml b/package.yaml index 57b7c7b8a..72a9c7448 100644 --- a/package.yaml +++ b/package.yaml @@ -151,6 +151,7 @@ dependencies: - minio-hs - network-ip - data-textual + - fastcdc other-extensions: - GeneralizedNewtypeDeriving diff --git a/src/Handler/Utils/Files.hs b/src/Handler/Utils/Files.hs index 2375bd4e7..fafad4903 100644 --- a/src/Handler/Utils/Files.hs +++ b/src/Handler/Utils/Files.hs @@ -2,6 +2,7 @@ module Handler.Utils.Files ( sourceFile, sourceFile' , sourceFiles, sourceFiles' , SourceFilesException(..) + , sourceFileDB ) where import Import @@ -23,6 +24,28 @@ data SourceFilesException deriving anyclass (Exception) +sourceFileDB :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) + => FileContentReference -> ConduitT () ByteString (SqlPersistT m) () +sourceFileDB fileReference = do + dbChunksize <- getsYesod $ view _appFileUploadDBChunksize + let retrieveChunk chunkHash = \case + Nothing -> return Nothing + Just start -> do + chunk <- E.selectMaybe . E.from $ \fileContentChunk -> do + E.where_ $ fileContentChunk E.^. FileContentChunkId E.==. E.val chunkHash + return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize) + case chunk of + Nothing -> throwM SourceFilesContentUnavailable + Just (E.Value c) -> return . Just . (c, ) $ if + | olength c >= dbChunksize -> Just $ start + dbChunksize + | otherwise -> Nothing + chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do + E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference + E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ] + return $ fileContentEntry E.^. FileContentEntryChunkHash + chunkHashes .| C.map E.unValue .| awaitForever (\chunkHash -> C.unfoldM (retrieveChunk chunkHash) $ Just (1 :: Int)) + + sourceFiles :: Monad m => ConduitT FileReference DBFile m () sourceFiles = C.map sourceFile @@ -39,24 +62,7 @@ sourceFile FileReference{..} = File toFileContent fileReference = do inDB <- lift . E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference if - | inDB -> do - dbChunksize <- getsYesod $ view _appFileUploadDBChunksize - let retrieveChunk chunkHash = \case - Nothing -> return Nothing - Just start -> do - chunk <- E.selectMaybe . E.from $ \fileContentChunk -> do - E.where_ $ fileContentChunk E.^. FileContentChunkId E.==. E.val chunkHash - return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize) - case chunk of - Nothing -> throwM SourceFilesContentUnavailable - Just (E.Value c) -> return . Just . (c, ) $ if - | olength c >= dbChunksize -> Just $ start + dbChunksize - | otherwise -> Nothing - chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do - E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference - E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ] - return $ fileContentEntry E.^. FileContentEntryChunkHash - chunkHashes .| C.map E.unValue .| awaitForever (\chunkHash -> C.unfoldM (retrieveChunk chunkHash) $ Just (1 :: Int)) + | inDB -> sourceFileDB fileReference | otherwise -> do chunkVar <- newEmptyTMVarIO minioAsync <- lift . allocateLinkedAsync $ diff --git a/src/Jobs.hs b/src/Jobs.hs index 320b9f56a..21925bebc 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -5,7 +5,7 @@ module Jobs , stopJobCtl ) where -import Import +import Import hiding (StateT) import Jobs.Types as Types hiding (JobCtl(JobCtlQueue)) import Jobs.Queue import Jobs.Crontab @@ -30,6 +30,7 @@ import qualified Data.Map.Strict as Map import Data.Map.Strict ((!)) import Control.Monad.Trans.RWS.Lazy (RWST, mapRWST, evalRWST) +import Control.Monad.Trans.State.Strict (StateT, evalStateT) import qualified Control.Monad.State.Class as State import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Trans.Cont (ContT(..), callCC) @@ -99,6 +100,7 @@ handleJobs foundation@UniWorX{..} jobConfirm <- liftIO $ newTVarIO HashMap.empty jobShutdown <- liftIO newEmptyTMVarIO jobCurrentCrontab <- liftIO $ newTVarIO Nothing + jobHeldLocks <- liftIO $ newTVarIO Set.empty atomically $ putTMVar appJobState JobState { jobContext = JobContext{..} , .. @@ -414,13 +416,14 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do handleCmd JobCtlTest = return () handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod) handleCmd (JobCtlQueue job) = lift $ queueJob' job - handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do + handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do content <- case fromJSON queuedJobContent of Aeson.Success c -> return c Aeson.Error t -> do $logErrorS logIdent $ "Aeson decoding error: " <> pack t throwM $ JInvalid jId j + $logInfoS logIdent $ tshow content $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content instanceID' <- getsYesod $ view instanceID @@ -466,40 +469,45 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do . Set.filter (((/=) `on` classifyHealthReport) newReport . snd) atomically . modifyTVar' hrStorage $ force . updateReports -jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a -jLocked jId act = do - hasLock <- liftIO $ newTVarIO False - +jLocked :: QueuedJobId -> (Entity QueuedJob -> ReaderT JobContext Handler a) -> ReaderT JobContext Handler a +jLocked jId act = flip evalStateT False $ do let - lock = runDB . setSerializable $ do - qj@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId + lock :: StateT Bool (ReaderT JobContext Handler) (Entity QueuedJob) + lock = hoist (hoist $ runDB . setSerializable) $ do + qj@QueuedJob{..} <- lift . lift $ maybe (throwM $ JNonexistant jId) return =<< get jId instanceID' <- getsYesod $ view instanceID threshold <- getsYesod $ view _appJobStaleThreshold now <- liftIO getCurrentTime + heldLocks <- asks jobHeldLocks + isHeld <- (jId `Set.member`) <$> atomically (readTVar heldLocks) hadStale <- maybeT (return False) $ do lockTime <- MaybeT $ return queuedJobLockTime lockInstance <- MaybeT $ return queuedJobLockInstance if | lockInstance == instanceID' , diffUTCTime now lockTime >= threshold + , not isHeld -> return True | otherwise -> throwM $ JLocked jId lockInstance lockTime when hadStale . $logWarnS "Jobs" $ "Ignored stale lock: " <> tshow (Entity jId qj) - val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID' - , QueuedJobLockTime =. Just now - ] - liftIO . atomically $ writeTVar hasLock True - return val + State.put True + val <- lift . lift $ updateGet jId [ QueuedJobLockInstance =. Just instanceID' + , QueuedJobLockTime =. Just now + ] + atomically . modifyTVar' heldLocks $ Set.insert jId + return $ Entity jId val - unlock = whenM (readTVarIO hasLock) $ - runDB . setSerializable $ - update jId [ QueuedJobLockInstance =. Nothing - , QueuedJobLockTime =. Nothing - ] + unlock :: Entity QueuedJob -> StateT Bool (ReaderT JobContext Handler) () + unlock (Entity jId' _) = whenM State.get $ do + atomically . flip modifyTVar' (Set.delete jId') =<< asks jobHeldLocks + lift . lift . runDB . setSerializable $ + update jId' [ QueuedJobLockInstance =. Nothing + , QueuedJobLockTime =. Nothing + ] - bracket lock (const unlock) act + bracket lock unlock $ lift . act pruneLastExecs :: Crontab JobCtl -> DB () diff --git a/src/Jobs/Crontab.hs b/src/Jobs/Crontab.hs index ae6897a48..84a73489e 100644 --- a/src/Jobs/Crontab.hs +++ b/src/Jobs/Crontab.hs @@ -89,6 +89,15 @@ determineCrontab = execWriterT $ do , cronRateLimit = iInterval , cronNotAfter = Right CronNotScheduled } + whenIsJust appRechunkFiles $ \rInterval -> + tell $ HashMap.singleton + (JobCtlQueue JobRechunkFiles) + Cron + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = rInterval + , cronNotAfter = Right CronNotScheduled + } tell . flip foldMap universeF $ \kind -> case appHealthCheckInterval kind of diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index 37d16d58c..80cac055c 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -1,7 +1,7 @@ module Jobs.Handler.Files ( dispatchJobPruneSessionFiles , dispatchJobPruneUnreferencedFiles - , dispatchJobInjectFiles + , dispatchJobInjectFiles, dispatchJobRechunkFiles ) where import Import hiding (matching, maximumBy, init) @@ -30,6 +30,8 @@ import Control.Monad.Random.Lazy import System.Random.Shuffle (shuffleM) import System.IO.Unsafe +import Handler.Utils.Files (sourceFileDB) + dispatchJobPruneSessionFiles :: JobHandler UniWorX dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do @@ -216,7 +218,7 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do chunkVar <- newEmptyTMVarIO dbAsync <- allocateLinkedAsync $ do atomically $ isEmptyTMVar chunkVar >>= guard . not - sinkFileDB $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) () + sinkFileDB False $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) () didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions @@ -245,3 +247,47 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do when (inj > 0) $ $logInfoS "InjectFiles" [st|Injected #{inj} files from upload cache into database|] + + +data RechunkFileException + = RechunkFileExceptionHashMismatch + { oldHash, newHash :: FileContentReference } + deriving (Eq, Ord, Show, Generic, Typeable) + deriving anyclass (Exception) + +dispatchJobRechunkFiles :: JobHandler UniWorX +dispatchJobRechunkFiles = JobHandlerAtomic . hoist lift $ do + interval <- getsYesod $ view _appRechunkFiles + let + getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> E.distinctOnOrderBy [E.asc $ fileContentEntry E.^. FileContentEntryHash] $ do + E.where_ . E.exists . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do + E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash + E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash + E.where_ . E.not_ $ fileContentChunk E.^. FileContentChunkContentBased + + let size = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do + E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash + E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash + return $ E.sum_ (E.length_ $ fileContentChunk E.^. FileContentChunkContent:: E.SqlExpr (E.Value Word64)) + + return $ ( fileContentEntry E.^. FileContentEntryHash + , size + ) + + rechunkFile :: FileContentReference -> Word64 -> DB (Sum Natural, Sum Word64) + rechunkFile fRef sz = do + fRef' <- sinkFileDB True $ sourceFileDB fRef + unless (fRef == fRef') $ + throwM $ RechunkFileExceptionHashMismatch fRef fRef' + return (Sum 1, Sum sz) + + (Sum rechunkedEntries, Sum rechunkedSize) <- runConduit $ + getEntryCandidates + .| C.mapMaybe (\(E.Value fRef, E.Value sz) -> (fRef, ) <$> sz) + .| maybe (C.map id) (takeWhileTime . (/ 2)) interval + .| persistentTokenBucketTakeC' TokenBucketRechunkFiles (view _2 :: _ -> Word64) + .| C.mapM (uncurry rechunkFile) + .| C.fold + + when (rechunkedEntries > 0 || rechunkedSize > 0) $ + $logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedEntries} files in database (#{tshow rechunkedSize} bytes)|] diff --git a/src/Jobs/Handler/TransactionLog.hs b/src/Jobs/Handler/TransactionLog.hs index fb856ba2c..131ba2491 100644 --- a/src/Jobs/Handler/TransactionLog.hs +++ b/src/Jobs/Handler/TransactionLog.hs @@ -27,5 +27,5 @@ dispatchJobDeleteTransactionLogIPs = JobHandlerAtomic . hoist lift $ do retentionTime <- getsYesod $ view _appTransactionLogIPRetentionTime let cutoff = addUTCTime (- retentionTime) now - n <- updateWhereCount [ TransactionLogTime <. cutoff ] [ TransactionLogRemote =. Nothing ] + n <- updateWhereCount [ TransactionLogTime <. cutoff, TransactionLogRemote !=. Nothing ] [ TransactionLogRemote =. Nothing ] $logInfoS "DeleteTransactionLogIPs" [st|Deleted #{n} IP entries from transaction log|] diff --git a/src/Jobs/Queue.hs b/src/Jobs/Queue.hs index b5483a8c6..bb2faf762 100644 --- a/src/Jobs/Queue.hs +++ b/src/Jobs/Queue.hs @@ -82,7 +82,7 @@ writeJobCtlBlock = writeJobCtlBlock' writeJobCtl queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX (Maybe QueuedJobId) queueJobUnsafe queuedJobWriteLastExec job = do - $logInfoS "queueJob" $ tshow job + $logDebugS "queueJob" $ tshow job doQueue <- fmap not . and2M (return $ jobNoQueueSame job) $ exists [ QueuedJobContent ==. toJSON job ] diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 2d7113649..59a15c99a 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -92,6 +92,7 @@ data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notifica } | JobInjectFiles | JobPruneFallbackPersonalisedSheetFilesKeys + | JobRechunkFiles deriving (Eq, Ord, Show, Read, Generic, Typeable) data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId } | NotificationSheetActive { nSheet :: SheetId } @@ -226,6 +227,7 @@ newWorkerId = JobWorkerId <$> liftIO newUnique data JobContext = JobContext { jobCrontab :: TVar (Crontab JobCtl) , jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException)))) + , jobHeldLocks :: TVar (Set QueuedJobId) } @@ -254,6 +256,8 @@ jobNoQueueSame = \case JobPruneSessionFiles{} -> True JobPruneUnreferencedFiles{} -> True JobInjectFiles{} -> True + JobPruneFallbackPersonalisedSheetFilesKeys{} -> True + JobRechunkFiles{} -> True _ -> False diff --git a/src/Model/Migration.hs b/src/Model/Migration.hs index e52ecd577..519944cc4 100644 --- a/src/Model/Migration.hs +++ b/src/Model/Migration.hs @@ -929,7 +929,7 @@ customMigrations = Map.fromListWith (>>) CREATE TABLE file_content_chunk_unreferenced (id bigserial, hash bytea NOT NULL, since timestamp with time zone NOT NULL); INSERT INTO file_content_chunk_unreferenced (since, hash) (SELECT unreferenced_since as since, hash FROM file_content_chunk WHERE NOT (unreferenced_since IS NULL)); - ALTER TABLE file_content_chunk_chunk DROP COLUMN unreferenced_since; + ALTER TABLE file_content_chunk DROP COLUMN unreferenced_since; CREATE TABLE file_content_entry (hash bytea NOT NULL, ix bigint NOT NULL, chunk_hash bytea NOT NULL); INSERT INTO file_content_entry (hash, chunk_hash, ix) (SELECT hash, hash as chunk_hash, 0 as ix FROM file_content_chunk); diff --git a/src/Model/Types/Misc.hs b/src/Model/Types/Misc.hs index 916bd2df9..7eee837b9 100644 --- a/src/Model/Types/Misc.hs +++ b/src/Model/Types/Misc.hs @@ -267,6 +267,7 @@ instance Csv.FromField Sex where data TokenBucketIdent = TokenBucketInjectFiles | TokenBucketPruneFiles + | TokenBucketRechunkFiles deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) deriving anyclass (Universe, Finite, Hashable) diff --git a/src/Settings.hs b/src/Settings.hs index f604dddf2..a97ece5b4 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -70,6 +70,8 @@ import Text.Show (showParen, showString) import qualified Data.List.PointedList as P import qualified Network.Minio as Minio + +import Data.Conduit.Algorithms.FastCDC -- | Runtime settings to configure this application. These settings can be @@ -174,8 +176,11 @@ data AppSettings = AppSettings , appUploadCacheConf :: Maybe Minio.ConnectInfo , appUploadCacheBucket :: Minio.Bucket , appInjectFiles :: Maybe NominalDiffTime + , appRechunkFiles :: Maybe NominalDiffTime , appFileUploadDBChunksize :: Int + , appFileChunkingParams :: FastCDCParameters + , appFavouritesQuickActionsBurstsize , appFavouritesQuickActionsAvgInverseRate :: Word64 , appFavouritesQuickActionsTimeout :: DiffTime @@ -476,8 +481,13 @@ instance FromJSON AppSettings where appSessionFilesExpire <- o .: "session-files-expire" appKeepUnreferencedFiles <- o .:? "keep-unreferenced-files" .!= 0 appInjectFiles <- o .:? "inject-files" + appRechunkFiles <- o .:? "rechunk-files" appFileUploadDBChunksize <- o .: "file-upload-db-chunksize" + appFileChunkingTargetExponent <- o .: "file-chunking-target-exponent" + appFileChunkingHashWindow <- o .: "file-chunking-hash-window" + appFileChunkingParams <- maybe (fail "Could not recommend FastCDCParameters") return $ recommendFastCDCParameters appFileChunkingTargetExponent appFileChunkingHashWindow + appPruneUnreferencedFilesWithin <- o .: "prune-unreferenced-files-within" appPruneUnreferencedFilesInterval <- o .: "prune-unreferenced-files-interval" diff --git a/src/Utils/Files.hs b/src/Utils/Files.hs index fadc7f23d..10228a94b 100644 --- a/src/Utils/Files.hs +++ b/src/Utils/Files.hs @@ -32,23 +32,31 @@ import qualified Data.UUID.V4 as UUID import qualified Database.Esqueleto as E import qualified Database.Esqueleto.Utils as E +import Data.Conduit.Algorithms.FastCDC (fastCDC) + sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) - => ConduitT () ByteString (SqlPersistT m) () -> SqlPersistT m FileContentReference -sinkFileDB fileContentContent = do - dbChunksize <- getsYesod $ view _appFileUploadDBChunksize + => Bool -- ^ Replace? Use only in serializable transaction + -> ConduitT () ByteString (SqlPersistT m) () + -> SqlPersistT m FileContentReference +sinkFileDB doReplace fileContentContent = do + chunkingParams <- getsYesod $ view _appFileChunkingParams let sinkChunk fileContentChunkContent = do fileChunkLockTime <- liftIO getCurrentTime fileChunkLockInstance <- getsYesod appInstanceID tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. } - lift . handleIfSql isUniqueConstraintViolation (const $ return ()) $ - insert_ FileContentChunk{..} + existsChunk <- lift $ exists [FileContentChunkHash ==. fileContentChunkHash] + let setContentBased = updateWhere [FileContentChunkHash ==. fileContentChunkHash] [FileContentChunkContentBased =. fileContentChunkContentBased] + if | existsChunk -> lift setContentBased + | otherwise -> lift . handleIfSql isUniqueConstraintViolation (const $ setContentBased) $ + insert_ FileContentChunk{..} return $ FileContentChunkKey fileContentChunkHash where fileContentChunkHash = _Wrapped # Crypto.hash fileContentChunkContent - ((review _Wrapped -> fileContentHash, fileContentChunks), chunkLocks) <- runConduit . runWriterC $ transPipe lift fileContentContent - .| C.chunksOfE dbChunksize + ((review _Wrapped -> fileContentHash, fileContentChunks), chunkLocks) <- runConduit . runWriterC $ + transPipe lift fileContentContent + .| fastCDC chunkingParams .| C.mapM (\c -> (c, ) <$> sinkChunk c) .| transPipe lift (getZipConduit $ (,) <$> ZipConduit (C.map (view _1) .| Crypto.sinkHash) <*> ZipConduit (C.foldMap $ views _2 Seq.singleton)) @@ -63,14 +71,19 @@ sinkFileDB fileContentContent = do deleteWhere [ FileChunkLockId <-. Set.toList chunkLocks ] let entryExists = E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileContentHash - unlessM entryExists . void $ - insertMany_ [ FileContentEntry{ fileContentEntryHash = fileContentHash, .. } - | fileContentEntryChunkHash <- otoList fileContentChunks - | fileContentEntryIx <- [0..] - ] + insertEntries = handleIfSql isUniqueConstraintViolation (const $ return ()) . void $ insertMany_ + [ FileContentEntry{ fileContentEntryHash = fileContentHash, .. } + | fileContentEntryChunkHash <- otoList fileContentChunks + | fileContentEntryIx <- [0..] + ] + if | not doReplace -> unlessM entryExists insertEntries + | otherwise -> do + deleteWhere [ FileContentEntryHash ==. fileContentHash ] + insertEntries + return fileContentHash - where fileContentChunkContentBased = False + where fileContentChunkContentBased = True sinkFiles :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT (File (SqlPersistT m)) FileReference (SqlPersistT m) () @@ -86,7 +99,7 @@ sinkFile File{ fileContent = Just fileContentContent, .. } = do (unsealConduitT -> fileContentContent', isEmpty) <- fileContentContent $$+ is _Nothing <$> C.peekE fileContentHash <- if - | not isEmpty -> maybeT (sinkFileDB fileContentContent') $ do + | not isEmpty -> maybeT (sinkFileDB False fileContentContent') $ do uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket chunk <- liftIO newEmptyMVar let putChunks = do diff --git a/stack.yaml b/stack.yaml index bedc224c5..39a517f26 100644 --- a/stack.yaml +++ b/stack.yaml @@ -47,6 +47,12 @@ extra-deps: - filepath-crypto - uuid-crypto + - git: https://github.com/gkleen/FastCDC.git + commit: 7326e2931454282df9081105dad812845db5c530 + subdirs: + - gearhash + - fastcdc + - generic-lens-1.2.0.0@sha256:b19e7970c93743a46bc3702331512a96d163de4356472f2d51a2945887aefe8c,6524 # manual downgrade; won't compile with >=2.0.0.0 - acid-state-0.16.0.1@sha256:d43f6ee0b23338758156c500290c4405d769abefeb98e9bc112780dae09ece6f,6207 diff --git a/stack.yaml.lock b/stack.yaml.lock index 9dd64cf44..f6232b055 100644 --- a/stack.yaml.lock +++ b/stack.yaml.lock @@ -5,9 +5,6 @@ packages: - completed: - cabal-file: - size: 4229 - sha256: 0dcfe3c4a67be4e96e1ae2e3c4b8744bc11e094853005a32f6074ab776caa3a9 name: encoding version: 0.8.2 git: git@gitlab2.rz.ifi.lmu.de:uni2work/encoding.git @@ -19,9 +16,6 @@ packages: git: git@gitlab2.rz.ifi.lmu.de:uni2work/encoding.git commit: 22fc3bb14841d8d50997aa47f1be3852e666f787 - completed: - cabal-file: - size: 2399 - sha256: 20cdf97602abb8fd7356c1a64c69fa857e34ab4cfe7834460d2ad783f7e4e4e3 name: memcached-binary version: 0.2.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/memcached-binary.git @@ -33,9 +27,6 @@ packages: git: git@gitlab2.rz.ifi.lmu.de:uni2work/memcached-binary.git commit: b7071df50bad3a251a544b984e4bf98fa09b8fae - completed: - cabal-file: - size: 1423 - sha256: 49818ee0de2d55cbfbc15ca4de1761c3adac6ba3dfcdda960b413cad4f4fa47f name: conduit-resumablesink version: '0.3' git: git@gitlab2.rz.ifi.lmu.de:uni2work/conduit-resumablesink.git @@ -47,9 +38,6 @@ packages: git: git@gitlab2.rz.ifi.lmu.de:uni2work/conduit-resumablesink.git commit: cbea6159c2975d42f948525e03e12fc390da53c5 - completed: - cabal-file: - size: 2069 - sha256: 9192ac19ea5da3cd4b8c86a4266592aff7b9256311aa5f42ae6de94ccacf1366 name: HaskellNet version: 0.5.1 git: git://github.com/jtdaugherty/HaskellNet.git @@ -61,9 +49,6 @@ packages: git: git://github.com/jtdaugherty/HaskellNet.git commit: 5aa1f3b009253b02c4822005ac59ee208a10a347 - completed: - cabal-file: - size: 1934 - sha256: 9fbe7c3681e963eea213ab38be17966bb690788c1c55a67257916b677d7d2ec2 name: HaskellNet-SSL version: 0.3.4.1 git: git@gitlab2.rz.ifi.lmu.de:uni2work/HaskellNet-SSL.git @@ -75,9 +60,6 @@ packages: git: git@gitlab2.rz.ifi.lmu.de:uni2work/HaskellNet-SSL.git commit: 40393c938111ac78232dc2c7eec5edb4a22d03e8 - completed: - cabal-file: - size: 2208 - sha256: 48f6e03d8f812bd24e2601497ffe9c8a78907fa2266ba05abeefdfe99221617d name: ldap-client version: 0.4.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/ldap-client.git @@ -90,9 +72,6 @@ packages: commit: 01afaf599ba6f8a9d804c269e91d3190b249d3f0 - completed: subdir: serversession - cabal-file: - size: 2081 - sha256: a958ff0007e5084e3e4c2a33acc9860c31186105f02f8ab99ecb847a7a8f9497 name: serversession version: 1.0.1 git: git@gitlab2.rz.ifi.lmu.de:uni2work/serversession.git @@ -106,9 +85,6 @@ packages: commit: 1c95b0100471279413485411032d639881012a5e - completed: subdir: serversession-backend-acid-state - cabal-file: - size: 1875 - sha256: 6cc9d29e788334670bc102213a8aae73bc1b8b0a00c416f06d232376750443b7 name: serversession-backend-acid-state version: 1.0.3 git: git@gitlab2.rz.ifi.lmu.de:uni2work/serversession.git @@ -121,9 +97,6 @@ packages: git: git@gitlab2.rz.ifi.lmu.de:uni2work/serversession.git commit: 1c95b0100471279413485411032d639881012a5e - completed: - cabal-file: - size: 1966 - sha256: 92d6d7be95a1ee6cb9783deb35e0d4e4959c7de20d518a45370084b57e20ba51 name: xss-sanitize version: 0.3.6 git: git@gitlab2.rz.ifi.lmu.de:uni2work/xss-sanitize.git @@ -136,9 +109,6 @@ packages: commit: 074ed7c8810aca81f60f2c535f9e7bad67e9d95a - completed: subdir: colonnade - cabal-file: - size: 2020 - sha256: 28f603d097aee65ddf8fe032e7e0f87523a58c516253cba196922027c8fd54d5 name: colonnade version: 1.2.0.2 git: git@gitlab2.rz.ifi.lmu.de:uni2work/colonnade.git @@ -151,9 +121,6 @@ packages: git: git@gitlab2.rz.ifi.lmu.de:uni2work/colonnade.git commit: f8170266ab25b533576e96715bedffc5aa4f19fa - completed: - cabal-file: - size: 9845 - sha256: 674630347209bc5f7984e8e9d93293510489921f2d2d6092ad1c9b8c61b6560a name: minio-hs version: 1.5.2 git: git@gitlab2.rz.ifi.lmu.de:uni2work/minio-hs.git @@ -166,9 +133,6 @@ packages: commit: 42103ab247057c04c8ce7a83d9d4c160713a3df1 - completed: subdir: cryptoids-class - cabal-file: - size: 1155 - sha256: 1fa96858ded816798f8e1c77d7945185c0d7ceb2536185d39fc72496da8a0125 name: cryptoids-class version: 0.0.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git @@ -182,9 +146,6 @@ packages: commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f - completed: subdir: cryptoids-types - cabal-file: - size: 1214 - sha256: ee8966212554a156f2de236d4f005ff3a9d3098778ff6cc3f114ccaa0aff8825 name: cryptoids-types version: 1.0.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git @@ -198,9 +159,6 @@ packages: commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f - completed: subdir: cryptoids - cabal-file: - size: 1505 - sha256: fcf07cd0dca21db976c25cbdf4dcc5c747cebcb7bf14c05804c8ae14223f6046 name: cryptoids version: 0.5.1.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git @@ -214,9 +172,6 @@ packages: commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f - completed: subdir: filepath-crypto - cabal-file: - size: 1716 - sha256: 218da063bb7b00e3728deebf830904174b2b78bc29b3f203e6824b8caac92788 name: filepath-crypto version: 0.1.0.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git @@ -230,9 +185,6 @@ packages: commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f - completed: subdir: uuid-crypto - cabal-file: - size: 1460 - sha256: 1db54db1b85303e50cec3c99ddb8de6c9bedc388fa9ce5a1fce61520023b9ee5 name: uuid-crypto version: 1.4.0.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git @@ -244,6 +196,32 @@ packages: subdir: uuid-crypto git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f +- completed: + subdir: gearhash + name: gearhash + version: 0.0.0 + git: https://github.com/gkleen/FastCDC.git + pantry-tree: + size: 504 + sha256: 61a08cdd003dc8a418f410dacbcc3a91adea4c23864f8ddbaba3b762e4c2924d + commit: 7326e2931454282df9081105dad812845db5c530 + original: + subdir: gearhash + git: https://github.com/gkleen/FastCDC.git + commit: 7326e2931454282df9081105dad812845db5c530 +- completed: + subdir: fastcdc + name: fastcdc + version: 0.0.0 + git: https://github.com/gkleen/FastCDC.git + pantry-tree: + size: 244 + sha256: 80ae3d79344f7c6a73a8d7adf07ff2b7f0736fd34760b4b8a15e26f32d9773f6 + commit: 7326e2931454282df9081105dad812845db5c530 + original: + subdir: fastcdc + git: https://github.com/gkleen/FastCDC.git + commit: 7326e2931454282df9081105dad812845db5c530 - completed: hackage: generic-lens-1.2.0.0@sha256:b19e7970c93743a46bc3702331512a96d163de4356472f2d51a2945887aefe8c,6524 pantry-tree: