feat(files): content dependent chunking
This commit is contained in:
parent
58c2420458
commit
d624a951c5
@ -159,8 +159,13 @@ upload-cache:
|
||||
auto-discover-region: "_env:UPLOAD_S3_AUTO_DISCOVER_REGION:true"
|
||||
disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false"
|
||||
upload-cache-bucket: "uni2work-uploads"
|
||||
inject-files: 10
|
||||
file-upload-db-chunksize: 1048576 # 1MiB
|
||||
|
||||
inject-files: 307
|
||||
rechunk-files: 601
|
||||
|
||||
file-upload-db-chunksize: 4194304 # 4MiB
|
||||
file-chunking-target-exponent: 21 # 2MiB
|
||||
file-chunking-hash-window: 4096
|
||||
|
||||
server-sessions:
|
||||
idle-timeout: 28807
|
||||
@ -231,6 +236,9 @@ token-buckets:
|
||||
depth: 1572864000 # 1500MiB
|
||||
inv-rate: 1.9e-6 # 2MiB/s
|
||||
initial-value: 0
|
||||
|
||||
rechunk-files:
|
||||
depth: 20971520 # 20MiB
|
||||
inv-rate: 9.5e-7 # 1MiB/s
|
||||
initial-value: 0
|
||||
|
||||
fallback-personalised-sheet-files-keys-expire: 2419200
|
||||
|
||||
@ -151,6 +151,7 @@ dependencies:
|
||||
- minio-hs
|
||||
- network-ip
|
||||
- data-textual
|
||||
- fastcdc
|
||||
|
||||
other-extensions:
|
||||
- GeneralizedNewtypeDeriving
|
||||
|
||||
@ -2,6 +2,7 @@ module Handler.Utils.Files
|
||||
( sourceFile, sourceFile'
|
||||
, sourceFiles, sourceFiles'
|
||||
, SourceFilesException(..)
|
||||
, sourceFileDB
|
||||
) where
|
||||
|
||||
import Import
|
||||
@ -23,6 +24,28 @@ data SourceFilesException
|
||||
deriving anyclass (Exception)
|
||||
|
||||
|
||||
sourceFileDB :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX)
|
||||
=> FileContentReference -> ConduitT () ByteString (SqlPersistT m) ()
|
||||
sourceFileDB fileReference = do
|
||||
dbChunksize <- getsYesod $ view _appFileUploadDBChunksize
|
||||
let retrieveChunk chunkHash = \case
|
||||
Nothing -> return Nothing
|
||||
Just start -> do
|
||||
chunk <- E.selectMaybe . E.from $ \fileContentChunk -> do
|
||||
E.where_ $ fileContentChunk E.^. FileContentChunkId E.==. E.val chunkHash
|
||||
return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize)
|
||||
case chunk of
|
||||
Nothing -> throwM SourceFilesContentUnavailable
|
||||
Just (E.Value c) -> return . Just . (c, ) $ if
|
||||
| olength c >= dbChunksize -> Just $ start + dbChunksize
|
||||
| otherwise -> Nothing
|
||||
chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference
|
||||
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ]
|
||||
return $ fileContentEntry E.^. FileContentEntryChunkHash
|
||||
chunkHashes .| C.map E.unValue .| awaitForever (\chunkHash -> C.unfoldM (retrieveChunk chunkHash) $ Just (1 :: Int))
|
||||
|
||||
|
||||
sourceFiles :: Monad m => ConduitT FileReference DBFile m ()
|
||||
sourceFiles = C.map sourceFile
|
||||
|
||||
@ -39,24 +62,7 @@ sourceFile FileReference{..} = File
|
||||
toFileContent fileReference = do
|
||||
inDB <- lift . E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference
|
||||
if
|
||||
| inDB -> do
|
||||
dbChunksize <- getsYesod $ view _appFileUploadDBChunksize
|
||||
let retrieveChunk chunkHash = \case
|
||||
Nothing -> return Nothing
|
||||
Just start -> do
|
||||
chunk <- E.selectMaybe . E.from $ \fileContentChunk -> do
|
||||
E.where_ $ fileContentChunk E.^. FileContentChunkId E.==. E.val chunkHash
|
||||
return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize)
|
||||
case chunk of
|
||||
Nothing -> throwM SourceFilesContentUnavailable
|
||||
Just (E.Value c) -> return . Just . (c, ) $ if
|
||||
| olength c >= dbChunksize -> Just $ start + dbChunksize
|
||||
| otherwise -> Nothing
|
||||
chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference
|
||||
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ]
|
||||
return $ fileContentEntry E.^. FileContentEntryChunkHash
|
||||
chunkHashes .| C.map E.unValue .| awaitForever (\chunkHash -> C.unfoldM (retrieveChunk chunkHash) $ Just (1 :: Int))
|
||||
| inDB -> sourceFileDB fileReference
|
||||
| otherwise -> do
|
||||
chunkVar <- newEmptyTMVarIO
|
||||
minioAsync <- lift . allocateLinkedAsync $
|
||||
|
||||
46
src/Jobs.hs
46
src/Jobs.hs
@ -5,7 +5,7 @@ module Jobs
|
||||
, stopJobCtl
|
||||
) where
|
||||
|
||||
import Import
|
||||
import Import hiding (StateT)
|
||||
import Jobs.Types as Types hiding (JobCtl(JobCtlQueue))
|
||||
import Jobs.Queue
|
||||
import Jobs.Crontab
|
||||
@ -30,6 +30,7 @@ import qualified Data.Map.Strict as Map
|
||||
import Data.Map.Strict ((!))
|
||||
|
||||
import Control.Monad.Trans.RWS.Lazy (RWST, mapRWST, evalRWST)
|
||||
import Control.Monad.Trans.State.Strict (StateT, evalStateT)
|
||||
import qualified Control.Monad.State.Class as State
|
||||
import Control.Monad.Writer.Class (MonadWriter(..))
|
||||
import Control.Monad.Trans.Cont (ContT(..), callCC)
|
||||
@ -99,6 +100,7 @@ handleJobs foundation@UniWorX{..}
|
||||
jobConfirm <- liftIO $ newTVarIO HashMap.empty
|
||||
jobShutdown <- liftIO newEmptyTMVarIO
|
||||
jobCurrentCrontab <- liftIO $ newTVarIO Nothing
|
||||
jobHeldLocks <- liftIO $ newTVarIO Set.empty
|
||||
atomically $ putTMVar appJobState JobState
|
||||
{ jobContext = JobContext{..}
|
||||
, ..
|
||||
@ -414,13 +416,14 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do
|
||||
handleCmd JobCtlTest = return ()
|
||||
handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod)
|
||||
handleCmd (JobCtlQueue job) = lift $ queueJob' job
|
||||
handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
|
||||
handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do
|
||||
content <- case fromJSON queuedJobContent of
|
||||
Aeson.Success c -> return c
|
||||
Aeson.Error t -> do
|
||||
$logErrorS logIdent $ "Aeson decoding error: " <> pack t
|
||||
throwM $ JInvalid jId j
|
||||
|
||||
$logInfoS logIdent $ tshow content
|
||||
$logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content
|
||||
|
||||
instanceID' <- getsYesod $ view instanceID
|
||||
@ -466,40 +469,45 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do
|
||||
. Set.filter (((/=) `on` classifyHealthReport) newReport . snd)
|
||||
atomically . modifyTVar' hrStorage $ force . updateReports
|
||||
|
||||
jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a
|
||||
jLocked jId act = do
|
||||
hasLock <- liftIO $ newTVarIO False
|
||||
|
||||
jLocked :: QueuedJobId -> (Entity QueuedJob -> ReaderT JobContext Handler a) -> ReaderT JobContext Handler a
|
||||
jLocked jId act = flip evalStateT False $ do
|
||||
let
|
||||
lock = runDB . setSerializable $ do
|
||||
qj@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId
|
||||
lock :: StateT Bool (ReaderT JobContext Handler) (Entity QueuedJob)
|
||||
lock = hoist (hoist $ runDB . setSerializable) $ do
|
||||
qj@QueuedJob{..} <- lift . lift $ maybe (throwM $ JNonexistant jId) return =<< get jId
|
||||
instanceID' <- getsYesod $ view instanceID
|
||||
threshold <- getsYesod $ view _appJobStaleThreshold
|
||||
now <- liftIO getCurrentTime
|
||||
heldLocks <- asks jobHeldLocks
|
||||
isHeld <- (jId `Set.member`) <$> atomically (readTVar heldLocks)
|
||||
hadStale <- maybeT (return False) $ do
|
||||
lockTime <- MaybeT $ return queuedJobLockTime
|
||||
lockInstance <- MaybeT $ return queuedJobLockInstance
|
||||
if
|
||||
| lockInstance == instanceID'
|
||||
, diffUTCTime now lockTime >= threshold
|
||||
, not isHeld
|
||||
-> return True
|
||||
| otherwise
|
||||
-> throwM $ JLocked jId lockInstance lockTime
|
||||
when hadStale .
|
||||
$logWarnS "Jobs" $ "Ignored stale lock: " <> tshow (Entity jId qj)
|
||||
val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID'
|
||||
, QueuedJobLockTime =. Just now
|
||||
]
|
||||
liftIO . atomically $ writeTVar hasLock True
|
||||
return val
|
||||
State.put True
|
||||
val <- lift . lift $ updateGet jId [ QueuedJobLockInstance =. Just instanceID'
|
||||
, QueuedJobLockTime =. Just now
|
||||
]
|
||||
atomically . modifyTVar' heldLocks $ Set.insert jId
|
||||
return $ Entity jId val
|
||||
|
||||
unlock = whenM (readTVarIO hasLock) $
|
||||
runDB . setSerializable $
|
||||
update jId [ QueuedJobLockInstance =. Nothing
|
||||
, QueuedJobLockTime =. Nothing
|
||||
]
|
||||
unlock :: Entity QueuedJob -> StateT Bool (ReaderT JobContext Handler) ()
|
||||
unlock (Entity jId' _) = whenM State.get $ do
|
||||
atomically . flip modifyTVar' (Set.delete jId') =<< asks jobHeldLocks
|
||||
lift . lift . runDB . setSerializable $
|
||||
update jId' [ QueuedJobLockInstance =. Nothing
|
||||
, QueuedJobLockTime =. Nothing
|
||||
]
|
||||
|
||||
bracket lock (const unlock) act
|
||||
bracket lock unlock $ lift . act
|
||||
|
||||
|
||||
pruneLastExecs :: Crontab JobCtl -> DB ()
|
||||
|
||||
@ -89,6 +89,15 @@ determineCrontab = execWriterT $ do
|
||||
, cronRateLimit = iInterval
|
||||
, cronNotAfter = Right CronNotScheduled
|
||||
}
|
||||
whenIsJust appRechunkFiles $ \rInterval ->
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue JobRechunkFiles)
|
||||
Cron
|
||||
{ cronInitial = CronAsap
|
||||
, cronRepeat = CronRepeatScheduled CronAsap
|
||||
, cronRateLimit = rInterval
|
||||
, cronNotAfter = Right CronNotScheduled
|
||||
}
|
||||
|
||||
tell . flip foldMap universeF $ \kind ->
|
||||
case appHealthCheckInterval kind of
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
module Jobs.Handler.Files
|
||||
( dispatchJobPruneSessionFiles
|
||||
, dispatchJobPruneUnreferencedFiles
|
||||
, dispatchJobInjectFiles
|
||||
, dispatchJobInjectFiles, dispatchJobRechunkFiles
|
||||
) where
|
||||
|
||||
import Import hiding (matching, maximumBy, init)
|
||||
@ -30,6 +30,8 @@ import Control.Monad.Random.Lazy
|
||||
import System.Random.Shuffle (shuffleM)
|
||||
import System.IO.Unsafe
|
||||
|
||||
import Handler.Utils.Files (sourceFileDB)
|
||||
|
||||
|
||||
dispatchJobPruneSessionFiles :: JobHandler UniWorX
|
||||
dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do
|
||||
@ -216,7 +218,7 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
chunkVar <- newEmptyTMVarIO
|
||||
dbAsync <- allocateLinkedAsync $ do
|
||||
atomically $ isEmptyTMVar chunkVar >>= guard . not
|
||||
sinkFileDB $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) ()
|
||||
sinkFileDB False $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) ()
|
||||
|
||||
didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
|
||||
@ -245,3 +247,47 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
|
||||
when (inj > 0) $
|
||||
$logInfoS "InjectFiles" [st|Injected #{inj} files from upload cache into database|]
|
||||
|
||||
|
||||
data RechunkFileException
|
||||
= RechunkFileExceptionHashMismatch
|
||||
{ oldHash, newHash :: FileContentReference }
|
||||
deriving (Eq, Ord, Show, Generic, Typeable)
|
||||
deriving anyclass (Exception)
|
||||
|
||||
dispatchJobRechunkFiles :: JobHandler UniWorX
|
||||
dispatchJobRechunkFiles = JobHandlerAtomic . hoist lift $ do
|
||||
interval <- getsYesod $ view _appRechunkFiles
|
||||
let
|
||||
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> E.distinctOnOrderBy [E.asc $ fileContentEntry E.^. FileContentEntryHash] $ do
|
||||
E.where_ . E.exists . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
|
||||
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
|
||||
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . E.not_ $ fileContentChunk E.^. FileContentChunkContentBased
|
||||
|
||||
let size = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
|
||||
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
|
||||
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
|
||||
return $ E.sum_ (E.length_ $ fileContentChunk E.^. FileContentChunkContent:: E.SqlExpr (E.Value Word64))
|
||||
|
||||
return $ ( fileContentEntry E.^. FileContentEntryHash
|
||||
, size
|
||||
)
|
||||
|
||||
rechunkFile :: FileContentReference -> Word64 -> DB (Sum Natural, Sum Word64)
|
||||
rechunkFile fRef sz = do
|
||||
fRef' <- sinkFileDB True $ sourceFileDB fRef
|
||||
unless (fRef == fRef') $
|
||||
throwM $ RechunkFileExceptionHashMismatch fRef fRef'
|
||||
return (Sum 1, Sum sz)
|
||||
|
||||
(Sum rechunkedEntries, Sum rechunkedSize) <- runConduit $
|
||||
getEntryCandidates
|
||||
.| C.mapMaybe (\(E.Value fRef, E.Value sz) -> (fRef, ) <$> sz)
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
|
||||
.| persistentTokenBucketTakeC' TokenBucketRechunkFiles (view _2 :: _ -> Word64)
|
||||
.| C.mapM (uncurry rechunkFile)
|
||||
.| C.fold
|
||||
|
||||
when (rechunkedEntries > 0 || rechunkedSize > 0) $
|
||||
$logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedEntries} files in database (#{tshow rechunkedSize} bytes)|]
|
||||
|
||||
@ -27,5 +27,5 @@ dispatchJobDeleteTransactionLogIPs = JobHandlerAtomic . hoist lift $ do
|
||||
retentionTime <- getsYesod $ view _appTransactionLogIPRetentionTime
|
||||
let cutoff = addUTCTime (- retentionTime) now
|
||||
|
||||
n <- updateWhereCount [ TransactionLogTime <. cutoff ] [ TransactionLogRemote =. Nothing ]
|
||||
n <- updateWhereCount [ TransactionLogTime <. cutoff, TransactionLogRemote !=. Nothing ] [ TransactionLogRemote =. Nothing ]
|
||||
$logInfoS "DeleteTransactionLogIPs" [st|Deleted #{n} IP entries from transaction log|]
|
||||
|
||||
@ -82,7 +82,7 @@ writeJobCtlBlock = writeJobCtlBlock' writeJobCtl
|
||||
|
||||
queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX (Maybe QueuedJobId)
|
||||
queueJobUnsafe queuedJobWriteLastExec job = do
|
||||
$logInfoS "queueJob" $ tshow job
|
||||
$logDebugS "queueJob" $ tshow job
|
||||
|
||||
doQueue <- fmap not . and2M (return $ jobNoQueueSame job) $ exists [ QueuedJobContent ==. toJSON job ]
|
||||
|
||||
|
||||
@ -92,6 +92,7 @@ data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notifica
|
||||
}
|
||||
| JobInjectFiles
|
||||
| JobPruneFallbackPersonalisedSheetFilesKeys
|
||||
| JobRechunkFiles
|
||||
deriving (Eq, Ord, Show, Read, Generic, Typeable)
|
||||
data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId }
|
||||
| NotificationSheetActive { nSheet :: SheetId }
|
||||
@ -226,6 +227,7 @@ newWorkerId = JobWorkerId <$> liftIO newUnique
|
||||
data JobContext = JobContext
|
||||
{ jobCrontab :: TVar (Crontab JobCtl)
|
||||
, jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException))))
|
||||
, jobHeldLocks :: TVar (Set QueuedJobId)
|
||||
}
|
||||
|
||||
|
||||
@ -254,6 +256,8 @@ jobNoQueueSame = \case
|
||||
JobPruneSessionFiles{} -> True
|
||||
JobPruneUnreferencedFiles{} -> True
|
||||
JobInjectFiles{} -> True
|
||||
JobPruneFallbackPersonalisedSheetFilesKeys{} -> True
|
||||
JobRechunkFiles{} -> True
|
||||
_ -> False
|
||||
|
||||
|
||||
|
||||
@ -929,7 +929,7 @@ customMigrations = Map.fromListWith (>>)
|
||||
|
||||
CREATE TABLE file_content_chunk_unreferenced (id bigserial, hash bytea NOT NULL, since timestamp with time zone NOT NULL);
|
||||
INSERT INTO file_content_chunk_unreferenced (since, hash) (SELECT unreferenced_since as since, hash FROM file_content_chunk WHERE NOT (unreferenced_since IS NULL));
|
||||
ALTER TABLE file_content_chunk_chunk DROP COLUMN unreferenced_since;
|
||||
ALTER TABLE file_content_chunk DROP COLUMN unreferenced_since;
|
||||
|
||||
CREATE TABLE file_content_entry (hash bytea NOT NULL, ix bigint NOT NULL, chunk_hash bytea NOT NULL);
|
||||
INSERT INTO file_content_entry (hash, chunk_hash, ix) (SELECT hash, hash as chunk_hash, 0 as ix FROM file_content_chunk);
|
||||
|
||||
@ -267,6 +267,7 @@ instance Csv.FromField Sex where
|
||||
|
||||
data TokenBucketIdent = TokenBucketInjectFiles
|
||||
| TokenBucketPruneFiles
|
||||
| TokenBucketRechunkFiles
|
||||
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
|
||||
deriving anyclass (Universe, Finite, Hashable)
|
||||
|
||||
|
||||
@ -70,6 +70,8 @@ import Text.Show (showParen, showString)
|
||||
import qualified Data.List.PointedList as P
|
||||
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
import Data.Conduit.Algorithms.FastCDC
|
||||
|
||||
|
||||
-- | Runtime settings to configure this application. These settings can be
|
||||
@ -174,8 +176,11 @@ data AppSettings = AppSettings
|
||||
, appUploadCacheConf :: Maybe Minio.ConnectInfo
|
||||
, appUploadCacheBucket :: Minio.Bucket
|
||||
, appInjectFiles :: Maybe NominalDiffTime
|
||||
, appRechunkFiles :: Maybe NominalDiffTime
|
||||
, appFileUploadDBChunksize :: Int
|
||||
|
||||
, appFileChunkingParams :: FastCDCParameters
|
||||
|
||||
, appFavouritesQuickActionsBurstsize
|
||||
, appFavouritesQuickActionsAvgInverseRate :: Word64
|
||||
, appFavouritesQuickActionsTimeout :: DiffTime
|
||||
@ -476,8 +481,13 @@ instance FromJSON AppSettings where
|
||||
appSessionFilesExpire <- o .: "session-files-expire"
|
||||
appKeepUnreferencedFiles <- o .:? "keep-unreferenced-files" .!= 0
|
||||
appInjectFiles <- o .:? "inject-files"
|
||||
appRechunkFiles <- o .:? "rechunk-files"
|
||||
appFileUploadDBChunksize <- o .: "file-upload-db-chunksize"
|
||||
|
||||
appFileChunkingTargetExponent <- o .: "file-chunking-target-exponent"
|
||||
appFileChunkingHashWindow <- o .: "file-chunking-hash-window"
|
||||
appFileChunkingParams <- maybe (fail "Could not recommend FastCDCParameters") return $ recommendFastCDCParameters appFileChunkingTargetExponent appFileChunkingHashWindow
|
||||
|
||||
appPruneUnreferencedFilesWithin <- o .: "prune-unreferenced-files-within"
|
||||
appPruneUnreferencedFilesInterval <- o .: "prune-unreferenced-files-interval"
|
||||
|
||||
|
||||
@ -32,23 +32,31 @@ import qualified Data.UUID.V4 as UUID
|
||||
import qualified Database.Esqueleto as E
|
||||
import qualified Database.Esqueleto.Utils as E
|
||||
|
||||
import Data.Conduit.Algorithms.FastCDC (fastCDC)
|
||||
|
||||
|
||||
sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX)
|
||||
=> ConduitT () ByteString (SqlPersistT m) () -> SqlPersistT m FileContentReference
|
||||
sinkFileDB fileContentContent = do
|
||||
dbChunksize <- getsYesod $ view _appFileUploadDBChunksize
|
||||
=> Bool -- ^ Replace? Use only in serializable transaction
|
||||
-> ConduitT () ByteString (SqlPersistT m) ()
|
||||
-> SqlPersistT m FileContentReference
|
||||
sinkFileDB doReplace fileContentContent = do
|
||||
chunkingParams <- getsYesod $ view _appFileChunkingParams
|
||||
|
||||
let sinkChunk fileContentChunkContent = do
|
||||
fileChunkLockTime <- liftIO getCurrentTime
|
||||
fileChunkLockInstance <- getsYesod appInstanceID
|
||||
|
||||
tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. }
|
||||
lift . handleIfSql isUniqueConstraintViolation (const $ return ()) $
|
||||
insert_ FileContentChunk{..}
|
||||
existsChunk <- lift $ exists [FileContentChunkHash ==. fileContentChunkHash]
|
||||
let setContentBased = updateWhere [FileContentChunkHash ==. fileContentChunkHash] [FileContentChunkContentBased =. fileContentChunkContentBased]
|
||||
if | existsChunk -> lift setContentBased
|
||||
| otherwise -> lift . handleIfSql isUniqueConstraintViolation (const $ setContentBased) $
|
||||
insert_ FileContentChunk{..}
|
||||
return $ FileContentChunkKey fileContentChunkHash
|
||||
where fileContentChunkHash = _Wrapped # Crypto.hash fileContentChunkContent
|
||||
((review _Wrapped -> fileContentHash, fileContentChunks), chunkLocks) <- runConduit . runWriterC $ transPipe lift fileContentContent
|
||||
.| C.chunksOfE dbChunksize
|
||||
((review _Wrapped -> fileContentHash, fileContentChunks), chunkLocks) <- runConduit . runWriterC $
|
||||
transPipe lift fileContentContent
|
||||
.| fastCDC chunkingParams
|
||||
.| C.mapM (\c -> (c, ) <$> sinkChunk c)
|
||||
.| transPipe lift (getZipConduit $ (,) <$> ZipConduit (C.map (view _1) .| Crypto.sinkHash) <*> ZipConduit (C.foldMap $ views _2 Seq.singleton))
|
||||
|
||||
@ -63,14 +71,19 @@ sinkFileDB fileContentContent = do
|
||||
deleteWhere [ FileChunkLockId <-. Set.toList chunkLocks ]
|
||||
|
||||
let entryExists = E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileContentHash
|
||||
unlessM entryExists . void $
|
||||
insertMany_ [ FileContentEntry{ fileContentEntryHash = fileContentHash, .. }
|
||||
| fileContentEntryChunkHash <- otoList fileContentChunks
|
||||
| fileContentEntryIx <- [0..]
|
||||
]
|
||||
insertEntries = handleIfSql isUniqueConstraintViolation (const $ return ()) . void $ insertMany_
|
||||
[ FileContentEntry{ fileContentEntryHash = fileContentHash, .. }
|
||||
| fileContentEntryChunkHash <- otoList fileContentChunks
|
||||
| fileContentEntryIx <- [0..]
|
||||
]
|
||||
if | not doReplace -> unlessM entryExists insertEntries
|
||||
| otherwise -> do
|
||||
deleteWhere [ FileContentEntryHash ==. fileContentHash ]
|
||||
insertEntries
|
||||
|
||||
|
||||
return fileContentHash
|
||||
where fileContentChunkContentBased = False
|
||||
where fileContentChunkContentBased = True
|
||||
|
||||
|
||||
sinkFiles :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT (File (SqlPersistT m)) FileReference (SqlPersistT m) ()
|
||||
@ -86,7 +99,7 @@ sinkFile File{ fileContent = Just fileContentContent, .. } = do
|
||||
(unsealConduitT -> fileContentContent', isEmpty) <- fileContentContent $$+ is _Nothing <$> C.peekE
|
||||
|
||||
fileContentHash <- if
|
||||
| not isEmpty -> maybeT (sinkFileDB fileContentContent') $ do
|
||||
| not isEmpty -> maybeT (sinkFileDB False fileContentContent') $ do
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
chunk <- liftIO newEmptyMVar
|
||||
let putChunks = do
|
||||
|
||||
@ -47,6 +47,12 @@ extra-deps:
|
||||
- filepath-crypto
|
||||
- uuid-crypto
|
||||
|
||||
- git: https://github.com/gkleen/FastCDC.git
|
||||
commit: 7326e2931454282df9081105dad812845db5c530
|
||||
subdirs:
|
||||
- gearhash
|
||||
- fastcdc
|
||||
|
||||
- generic-lens-1.2.0.0@sha256:b19e7970c93743a46bc3702331512a96d163de4356472f2d51a2945887aefe8c,6524 # manual downgrade; won't compile with >=2.0.0.0
|
||||
|
||||
- acid-state-0.16.0.1@sha256:d43f6ee0b23338758156c500290c4405d769abefeb98e9bc112780dae09ece6f,6207
|
||||
|
||||
@ -5,9 +5,6 @@
|
||||
|
||||
packages:
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 4229
|
||||
sha256: 0dcfe3c4a67be4e96e1ae2e3c4b8744bc11e094853005a32f6074ab776caa3a9
|
||||
name: encoding
|
||||
version: 0.8.2
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/encoding.git
|
||||
@ -19,9 +16,6 @@ packages:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/encoding.git
|
||||
commit: 22fc3bb14841d8d50997aa47f1be3852e666f787
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 2399
|
||||
sha256: 20cdf97602abb8fd7356c1a64c69fa857e34ab4cfe7834460d2ad783f7e4e4e3
|
||||
name: memcached-binary
|
||||
version: 0.2.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/memcached-binary.git
|
||||
@ -33,9 +27,6 @@ packages:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/memcached-binary.git
|
||||
commit: b7071df50bad3a251a544b984e4bf98fa09b8fae
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 1423
|
||||
sha256: 49818ee0de2d55cbfbc15ca4de1761c3adac6ba3dfcdda960b413cad4f4fa47f
|
||||
name: conduit-resumablesink
|
||||
version: '0.3'
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/conduit-resumablesink.git
|
||||
@ -47,9 +38,6 @@ packages:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/conduit-resumablesink.git
|
||||
commit: cbea6159c2975d42f948525e03e12fc390da53c5
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 2069
|
||||
sha256: 9192ac19ea5da3cd4b8c86a4266592aff7b9256311aa5f42ae6de94ccacf1366
|
||||
name: HaskellNet
|
||||
version: 0.5.1
|
||||
git: git://github.com/jtdaugherty/HaskellNet.git
|
||||
@ -61,9 +49,6 @@ packages:
|
||||
git: git://github.com/jtdaugherty/HaskellNet.git
|
||||
commit: 5aa1f3b009253b02c4822005ac59ee208a10a347
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 1934
|
||||
sha256: 9fbe7c3681e963eea213ab38be17966bb690788c1c55a67257916b677d7d2ec2
|
||||
name: HaskellNet-SSL
|
||||
version: 0.3.4.1
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/HaskellNet-SSL.git
|
||||
@ -75,9 +60,6 @@ packages:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/HaskellNet-SSL.git
|
||||
commit: 40393c938111ac78232dc2c7eec5edb4a22d03e8
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 2208
|
||||
sha256: 48f6e03d8f812bd24e2601497ffe9c8a78907fa2266ba05abeefdfe99221617d
|
||||
name: ldap-client
|
||||
version: 0.4.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/ldap-client.git
|
||||
@ -90,9 +72,6 @@ packages:
|
||||
commit: 01afaf599ba6f8a9d804c269e91d3190b249d3f0
|
||||
- completed:
|
||||
subdir: serversession
|
||||
cabal-file:
|
||||
size: 2081
|
||||
sha256: a958ff0007e5084e3e4c2a33acc9860c31186105f02f8ab99ecb847a7a8f9497
|
||||
name: serversession
|
||||
version: 1.0.1
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/serversession.git
|
||||
@ -106,9 +85,6 @@ packages:
|
||||
commit: 1c95b0100471279413485411032d639881012a5e
|
||||
- completed:
|
||||
subdir: serversession-backend-acid-state
|
||||
cabal-file:
|
||||
size: 1875
|
||||
sha256: 6cc9d29e788334670bc102213a8aae73bc1b8b0a00c416f06d232376750443b7
|
||||
name: serversession-backend-acid-state
|
||||
version: 1.0.3
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/serversession.git
|
||||
@ -121,9 +97,6 @@ packages:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/serversession.git
|
||||
commit: 1c95b0100471279413485411032d639881012a5e
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 1966
|
||||
sha256: 92d6d7be95a1ee6cb9783deb35e0d4e4959c7de20d518a45370084b57e20ba51
|
||||
name: xss-sanitize
|
||||
version: 0.3.6
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/xss-sanitize.git
|
||||
@ -136,9 +109,6 @@ packages:
|
||||
commit: 074ed7c8810aca81f60f2c535f9e7bad67e9d95a
|
||||
- completed:
|
||||
subdir: colonnade
|
||||
cabal-file:
|
||||
size: 2020
|
||||
sha256: 28f603d097aee65ddf8fe032e7e0f87523a58c516253cba196922027c8fd54d5
|
||||
name: colonnade
|
||||
version: 1.2.0.2
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/colonnade.git
|
||||
@ -151,9 +121,6 @@ packages:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/colonnade.git
|
||||
commit: f8170266ab25b533576e96715bedffc5aa4f19fa
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 9845
|
||||
sha256: 674630347209bc5f7984e8e9d93293510489921f2d2d6092ad1c9b8c61b6560a
|
||||
name: minio-hs
|
||||
version: 1.5.2
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/minio-hs.git
|
||||
@ -166,9 +133,6 @@ packages:
|
||||
commit: 42103ab247057c04c8ce7a83d9d4c160713a3df1
|
||||
- completed:
|
||||
subdir: cryptoids-class
|
||||
cabal-file:
|
||||
size: 1155
|
||||
sha256: 1fa96858ded816798f8e1c77d7945185c0d7ceb2536185d39fc72496da8a0125
|
||||
name: cryptoids-class
|
||||
version: 0.0.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git
|
||||
@ -182,9 +146,6 @@ packages:
|
||||
commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f
|
||||
- completed:
|
||||
subdir: cryptoids-types
|
||||
cabal-file:
|
||||
size: 1214
|
||||
sha256: ee8966212554a156f2de236d4f005ff3a9d3098778ff6cc3f114ccaa0aff8825
|
||||
name: cryptoids-types
|
||||
version: 1.0.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git
|
||||
@ -198,9 +159,6 @@ packages:
|
||||
commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f
|
||||
- completed:
|
||||
subdir: cryptoids
|
||||
cabal-file:
|
||||
size: 1505
|
||||
sha256: fcf07cd0dca21db976c25cbdf4dcc5c747cebcb7bf14c05804c8ae14223f6046
|
||||
name: cryptoids
|
||||
version: 0.5.1.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git
|
||||
@ -214,9 +172,6 @@ packages:
|
||||
commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f
|
||||
- completed:
|
||||
subdir: filepath-crypto
|
||||
cabal-file:
|
||||
size: 1716
|
||||
sha256: 218da063bb7b00e3728deebf830904174b2b78bc29b3f203e6824b8caac92788
|
||||
name: filepath-crypto
|
||||
version: 0.1.0.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git
|
||||
@ -230,9 +185,6 @@ packages:
|
||||
commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f
|
||||
- completed:
|
||||
subdir: uuid-crypto
|
||||
cabal-file:
|
||||
size: 1460
|
||||
sha256: 1db54db1b85303e50cec3c99ddb8de6c9bedc388fa9ce5a1fce61520023b9ee5
|
||||
name: uuid-crypto
|
||||
version: 1.4.0.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git
|
||||
@ -244,6 +196,32 @@ packages:
|
||||
subdir: uuid-crypto
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git
|
||||
commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f
|
||||
- completed:
|
||||
subdir: gearhash
|
||||
name: gearhash
|
||||
version: 0.0.0
|
||||
git: https://github.com/gkleen/FastCDC.git
|
||||
pantry-tree:
|
||||
size: 504
|
||||
sha256: 61a08cdd003dc8a418f410dacbcc3a91adea4c23864f8ddbaba3b762e4c2924d
|
||||
commit: 7326e2931454282df9081105dad812845db5c530
|
||||
original:
|
||||
subdir: gearhash
|
||||
git: https://github.com/gkleen/FastCDC.git
|
||||
commit: 7326e2931454282df9081105dad812845db5c530
|
||||
- completed:
|
||||
subdir: fastcdc
|
||||
name: fastcdc
|
||||
version: 0.0.0
|
||||
git: https://github.com/gkleen/FastCDC.git
|
||||
pantry-tree:
|
||||
size: 244
|
||||
sha256: 80ae3d79344f7c6a73a8d7adf07ff2b7f0736fd34760b4b8a15e26f32d9773f6
|
||||
commit: 7326e2931454282df9081105dad812845db5c530
|
||||
original:
|
||||
subdir: fastcdc
|
||||
git: https://github.com/gkleen/FastCDC.git
|
||||
commit: 7326e2931454282df9081105dad812845db5c530
|
||||
- completed:
|
||||
hackage: generic-lens-1.2.0.0@sha256:b19e7970c93743a46bc3702331512a96d163de4356472f2d51a2945887aefe8c,6524
|
||||
pantry-tree:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user