feat(files): chunking
BREAKING CHANGE: files now chunked
This commit is contained in:
parent
0cdd85eed1
commit
8f608c1955
@ -159,6 +159,7 @@ upload-cache:
|
||||
disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false"
|
||||
upload-cache-bucket: "uni2work-uploads"
|
||||
inject-files: 10
|
||||
file-upload-db-chunksize: 1048576 # 1MiB
|
||||
|
||||
server-sessions:
|
||||
idle-timeout: 28807
|
||||
|
||||
@ -1,9 +1,20 @@
|
||||
FileContent
|
||||
FileContentEntry
|
||||
hash FileContentReference
|
||||
ix Natural
|
||||
chunkHash FileContentChunkId
|
||||
UniqueFileContentEntry hash ix
|
||||
|
||||
FileContentChunk
|
||||
hash FileContentChunkReference
|
||||
content ByteString
|
||||
unreferencedSince UTCTime Maybe
|
||||
contentBased Bool default=false -- For Migration
|
||||
Primary hash
|
||||
|
||||
FileContentChunkUnreferenced
|
||||
hash FileContentChunkId
|
||||
since UTCTime
|
||||
UniqueFileContentChunkUnreferenced hash
|
||||
|
||||
SessionFile
|
||||
content FileContentReference Maybe
|
||||
touched UTCTime
|
||||
@ -12,3 +23,8 @@ FileLock
|
||||
content FileContentReference
|
||||
instance InstanceId
|
||||
time UTCTime
|
||||
|
||||
FileChunkLock
|
||||
hash FileContentChunkReference
|
||||
instance InstanceId
|
||||
time UTCTime
|
||||
@ -18,6 +18,8 @@ import Data.Aeson as Aeson
|
||||
|
||||
import Control.Monad.Fail
|
||||
|
||||
import Language.Haskell.TH.Syntax (Lift(liftTyped))
|
||||
import Instances.TH.Lift ()
|
||||
|
||||
instance HashAlgorithm hash => PersistField (Digest hash) where
|
||||
toPersistValue = PersistByteString . convert
|
||||
@ -46,3 +48,6 @@ instance HashAlgorithm hash => FromJSON (Digest hash) where
|
||||
|
||||
instance Hashable (Digest hash) where
|
||||
hashWithSalt s = (hashWithSalt s :: ByteString -> Int) . convert
|
||||
|
||||
instance HashAlgorithm hash => Lift (Digest hash) where
|
||||
liftTyped dgst = [||fromMaybe (error "Lifted digest has wrong length") $ digestFromByteString $$(liftTyped (convert dgst :: ByteString))||]
|
||||
|
||||
@ -6,6 +6,7 @@ module Database.Esqueleto.Utils
|
||||
, justVal, justValList
|
||||
, isJust
|
||||
, isInfixOf, hasInfix
|
||||
, strConcat, substring
|
||||
, or, and
|
||||
, any, all
|
||||
, subSelectAnd, subSelectOr
|
||||
@ -39,7 +40,8 @@ import qualified Data.Set as Set
|
||||
import qualified Data.List as List
|
||||
import qualified Data.Foldable as F
|
||||
import qualified Database.Esqueleto as E
|
||||
import qualified Database.Esqueleto.Internal.Sql as E
|
||||
import qualified Database.Esqueleto.PostgreSQL as E
|
||||
import qualified Database.Esqueleto.Internal.Internal as E
|
||||
import Database.Esqueleto.Utils.TH
|
||||
|
||||
import qualified Data.Text.Lazy as Lazy (Text)
|
||||
@ -96,6 +98,42 @@ hasInfix :: ( E.SqlString s1
|
||||
=> E.SqlExpr (E.Value s2) -> E.SqlExpr (E.Value s1) -> E.SqlExpr (E.Value Bool)
|
||||
hasInfix = flip isInfixOf
|
||||
|
||||
infixl 6 `strConcat`
|
||||
|
||||
strConcat :: E.SqlString s
|
||||
=> E.SqlExpr (E.Value s) -> E.SqlExpr (E.Value s) -> E.SqlExpr (E.Value s)
|
||||
strConcat = E.unsafeSqlBinOp " || "
|
||||
|
||||
substring :: ( E.SqlString str
|
||||
, Num from, Num for
|
||||
)
|
||||
=> E.SqlExpr (E.Value str)
|
||||
-> E.SqlExpr (E.Value from)
|
||||
-> E.SqlExpr (E.Value for)
|
||||
-> E.SqlExpr (E.Value str)
|
||||
substring (E.ERaw p1 f1) (E.ERaw p2 f2) (E.ERaw p3 f3)
|
||||
= E.ERaw E.Never $ \info ->
|
||||
let (strTLB, strVals) = f1 info
|
||||
(fromiTLB, fromiVals) = f2 info
|
||||
(foriTLB, foriVals) = f3 info
|
||||
in ( "SUBSTRING" <> E.parens (E.parensM p1 strTLB <> " FROM " <> E.parensM p2 fromiTLB <> " FOR " <> E.parensM p3 foriTLB)
|
||||
, strVals <> fromiVals <> foriVals
|
||||
)
|
||||
substring a b c = substring (construct a) (construct b) (construct c)
|
||||
where construct :: E.SqlExpr (E.Value a) -> E.SqlExpr (E.Value a)
|
||||
construct (E.ERaw p f) = E.ERaw E.Parens $ \info ->
|
||||
let (b1, vals) = f info
|
||||
build ("?", [E.PersistList vals']) =
|
||||
(E.uncommas $ replicate (length vals') "?", vals')
|
||||
build expr = expr
|
||||
in build (E.parensM p b1, vals)
|
||||
construct (E.ECompositeKey f) =
|
||||
E.ERaw E.Parens $ \info -> (E.uncommas $ f info, mempty)
|
||||
construct (E.EAliasedValue i _) =
|
||||
E.ERaw E.Never $ E.aliasedValueIdentToRawSql i
|
||||
construct (E.EValueReference i i') =
|
||||
E.ERaw E.Never $ E.valueReferenceToRawSql i i'
|
||||
|
||||
and, or :: Foldable f => f (E.SqlExpr (E.Value Bool)) -> E.SqlExpr (E.Value Bool)
|
||||
and = F.foldr (E.&&.) true
|
||||
or = F.foldr (E.||.) false
|
||||
@ -111,8 +149,11 @@ all :: MonoFoldable f => (Element f -> E.SqlExpr (E.Value Bool)) -> f -> E.SqlEx
|
||||
all test = and . map test . otoList
|
||||
|
||||
subSelectAnd, subSelectOr :: E.SqlQuery (E.SqlExpr (E.Value Bool)) -> E.SqlExpr (E.Value Bool)
|
||||
subSelectAnd q = E.subSelectUnsafe $ E.unsafeSqlFunction "bool_and" <$> q
|
||||
subSelectOr q = E.subSelectUnsafe $ E.unsafeSqlFunction "bool_or" <$> q
|
||||
subSelectAnd q = parens . E.subSelectUnsafe $ flip (E.unsafeSqlAggregateFunction "bool_and" E.AggModeAll) [] <$> q
|
||||
subSelectOr q = parens . E.subSelectUnsafe $ flip (E.unsafeSqlAggregateFunction "bool_or" E.AggModeAll) [] <$> q
|
||||
|
||||
parens :: E.SqlExpr (E.Value a) -> E.SqlExpr (E.Value a)
|
||||
parens = E.unsafeSqlFunction ""
|
||||
|
||||
|
||||
-- Allow usage of Tuples as DbtRowKey, i.e. SqlIn instances for tuples
|
||||
|
||||
@ -7,7 +7,7 @@ module Foundation.Type
|
||||
, _SessionStorageMemcachedSql, _SessionStorageAcid
|
||||
, SMTPPool
|
||||
, _appSettings', _appStatic, _appConnPool, _appSmtpPool, _appLdapPool, _appWidgetMemcached, _appHttpManager, _appLogger, _appLogSettings, _appCryptoIDKey, _appClusterID, _appInstanceID, _appJobState, _appSessionStore, _appSecretBoxKey, _appJSONWebKeySet, _appHealthReport
|
||||
, DB, Form, MsgRenderer, MailM
|
||||
, DB, Form, MsgRenderer, MailM, DBFile
|
||||
) where
|
||||
|
||||
import Import.NoFoundation
|
||||
@ -81,3 +81,4 @@ type DB = YesodDB UniWorX
|
||||
type Form x = Html -> MForm (HandlerFor UniWorX) (FormResult x, WidgetFor UniWorX ())
|
||||
type MsgRenderer = MsgRendererS UniWorX -- see Utils
|
||||
type MailM a = MailT (HandlerFor UniWorX) a
|
||||
type DBFile = File (YesodDB UniWorX)
|
||||
|
||||
@ -80,12 +80,12 @@ testDownload = do
|
||||
sourceDBChunks :: ConduitT () Int DB ()
|
||||
sourceDBChunks = forever sourceDBFiles
|
||||
.| C.mapM (\x -> x <$ $logDebugS "testDownload.sourceDBChunks" (tshow $ entityKey x))
|
||||
.| C.map ((length $!!) . fileContentContent . entityVal)
|
||||
.| C.map ((length $!!) . fileContentChunkContent . entityVal)
|
||||
.| takeLimit dlMaxSize
|
||||
where
|
||||
sourceDBFiles = E.selectSource . E.from $ \fileContent -> do
|
||||
sourceDBFiles = E.selectSource . E.from $ \fileContentChunk -> do
|
||||
E.orderBy [E.asc $ E.random_ @Int64]
|
||||
return fileContent
|
||||
return fileContentChunk
|
||||
|
||||
takeLimit n | n <= 0 = return ()
|
||||
takeLimit n = do
|
||||
|
||||
@ -195,7 +195,7 @@ sourcePersonalisedSheetFiles :: forall m.
|
||||
-> Maybe SheetId
|
||||
-> Maybe (Set UserId)
|
||||
-> PersonalisedSheetFilesDownloadAnonymous
|
||||
-> ConduitT () (Either PersonalisedSheetFile File) (SqlPersistT m) ()
|
||||
-> ConduitT () (Either PersonalisedSheetFile DBFile) (SqlPersistT m) ()
|
||||
sourcePersonalisedSheetFiles cId mbsid mbuids anonMode = do
|
||||
(mbIdx, cIDKey) <- lift . newPersonalisedFilesKey $ maybe (Left cId) Right mbsid
|
||||
let
|
||||
@ -255,9 +255,10 @@ sourcePersonalisedSheetFiles cId mbsid mbuids anonMode = do
|
||||
, fileModified = courseParticipantRegistration
|
||||
}
|
||||
yieldM . fmap Right $ do
|
||||
fileContent <- lift $ Just . toStrict <$> formatPersonalisedSheetFilesMeta anonMode cPart cID
|
||||
fileContent' <- lift $ formatPersonalisedSheetFilesMeta anonMode cPart cID
|
||||
let fileTitle = (dirName <//>) . ensureExtension "yaml" . unpack . mr $ MsgPersonalisedSheetFilesMetaFilename cID
|
||||
fileModified = courseParticipantRegistration
|
||||
fileContent = Just $ C.sourceLazy fileContent'
|
||||
return File{..}
|
||||
_dirCache %= Set.insert dirName
|
||||
whenIsJust mbPFile $ \(Entity _ pFile@PersonalisedSheetFile{..}) -> do
|
||||
|
||||
@ -11,8 +11,6 @@ import Handler.Utils.Submission
|
||||
|
||||
import qualified Data.Set as Set
|
||||
|
||||
import qualified Data.Text.Encoding as Text
|
||||
|
||||
import qualified Database.Esqueleto as E
|
||||
|
||||
import qualified Data.Conduit.Combinators as Conduit
|
||||
@ -32,9 +30,8 @@ getSubDownloadR tid ssh csh shn cID (submissionFileTypeIsUpdate -> isUpdate) pat
|
||||
|
||||
case isRating of
|
||||
True
|
||||
| isUpdate -> runDB $ do
|
||||
file <- runMaybeT $ lift . ratingFile cID =<< MaybeT (getRating submissionID)
|
||||
maybe notFound (return . toTypedContent . Text.decodeUtf8) $ fileContent =<< file
|
||||
| isUpdate -> maybe notFound sendThisFile <=< runDB . runMaybeT $
|
||||
lift . ratingFile cID =<< MaybeT (getRating submissionID)
|
||||
| otherwise -> notFound
|
||||
False -> do
|
||||
let results = (.| Conduit.map entityVal) . E.selectSource . E.from $ \sf -> do
|
||||
|
||||
@ -34,11 +34,13 @@ import Control.Monad.Logger
|
||||
|
||||
|
||||
-- | Simply send a `File`-Value
|
||||
sendThisFile :: File -> Handler TypedContent
|
||||
sendThisFile :: DBFile -> Handler TypedContent
|
||||
sendThisFile File{..}
|
||||
| Just fileContent' <- fileContent = do
|
||||
setContentDisposition' . Just $ takeFileName fileTitle
|
||||
return $ TypedContent (simpleContentType (mimeLookup $ pack fileTitle) <> "; charset=utf-8") (toContent fileContent')
|
||||
let cType = simpleContentType (mimeLookup $ pack fileTitle) <> "; charset=utf-8"
|
||||
respondSourceDB cType $
|
||||
fileContent' .| Conduit.map toFlushBuilder
|
||||
| otherwise = sendResponseStatus noContent204 ()
|
||||
|
||||
-- | Serve a single file, identified through a given DB query
|
||||
@ -46,7 +48,7 @@ serveOneFile :: forall file. HasFileReference file => ConduitT () file (YesodDB
|
||||
serveOneFile source = do
|
||||
results <- runDB . runConduit $ source .| Conduit.take 2 -- We don't need more than two files to make a decision below
|
||||
case results of
|
||||
[file] -> sendThisFile =<< runDB (sourceFile' file)
|
||||
[file] -> sendThisFile $ sourceFile' file
|
||||
[] -> notFound
|
||||
_other -> do
|
||||
$logErrorS "SFileR" "Multiple matching files found."
|
||||
@ -58,7 +60,7 @@ serveOneFile source = do
|
||||
serveSomeFiles :: forall file. HasFileReference file => FilePath -> ConduitT () file (YesodDB UniWorX) () -> Handler TypedContent
|
||||
serveSomeFiles archiveName source = serveSomeFiles' archiveName $ source .| C.map Left
|
||||
|
||||
serveSomeFiles' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file File) (YesodDB UniWorX) () -> Handler TypedContent
|
||||
serveSomeFiles' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file DBFile) (YesodDB UniWorX) () -> Handler TypedContent
|
||||
serveSomeFiles' archiveName source = do
|
||||
(source', results) <- runDB $ runPeekN 2 source
|
||||
|
||||
@ -66,7 +68,7 @@ serveSomeFiles' archiveName source = do
|
||||
|
||||
case results of
|
||||
[] -> notFound
|
||||
[file] -> sendThisFile =<< either (runDB . sourceFile') return file
|
||||
[file] -> sendThisFile $ either sourceFile' id file
|
||||
_moreFiles -> do
|
||||
setContentDisposition' $ Just archiveName
|
||||
respondSourceDB typeZip $ do
|
||||
@ -79,7 +81,7 @@ serveSomeFiles' archiveName source = do
|
||||
serveZipArchive :: forall file. HasFileReference file => FilePath -> ConduitT () file (YesodDB UniWorX) () -> Handler TypedContent
|
||||
serveZipArchive archiveName source = serveZipArchive' archiveName $ source .| C.map Left
|
||||
|
||||
serveZipArchive' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file File) (YesodDB UniWorX) () -> Handler TypedContent
|
||||
serveZipArchive' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file DBFile) (YesodDB UniWorX) () -> Handler TypedContent
|
||||
serveZipArchive' archiveName source = do
|
||||
(source', results) <- runDB $ runPeekN 1 source
|
||||
|
||||
|
||||
@ -276,7 +276,7 @@ storeAllocationResult :: AllocationId
|
||||
-> (AllocationFingerprint, Set (UserId, CourseId), Seq MatchingLogRun)
|
||||
-> DB ()
|
||||
storeAllocationResult allocId now (allocFp, allocMatchings, ppMatchingLog -> allocLog) = do
|
||||
FileReference{..} <- sinkFile $ File "matchings.log" (Just $ encodeUtf8 allocLog) now
|
||||
FileReference{..} <- sinkFile $ File "matchings.log" (Just . yield $ encodeUtf8 allocLog) now
|
||||
insert_ . AllocationMatching allocId allocFp now $ fromMaybe (error "allocation result stored without fileReferenceContent") fileReferenceContent
|
||||
|
||||
doAllocation allocId now allocMatchings
|
||||
|
||||
@ -7,12 +7,13 @@ module Handler.Utils.Files
|
||||
import Import
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import qualified Data.Conduit.List as C (unfoldM)
|
||||
|
||||
import Handler.Utils.Minio
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
import qualified Data.ByteString.Base64.URL as Base64
|
||||
import qualified Data.ByteArray as ByteArray
|
||||
import qualified Database.Esqueleto as E
|
||||
import qualified Database.Esqueleto.Utils as E
|
||||
|
||||
|
||||
data SourceFilesException
|
||||
@ -22,36 +23,59 @@ data SourceFilesException
|
||||
deriving anyclass (Exception)
|
||||
|
||||
|
||||
sourceFiles :: ConduitT FileReference File (YesodDB UniWorX) ()
|
||||
sourceFiles = C.mapM sourceFile
|
||||
sourceFiles :: Monad m => ConduitT FileReference DBFile m ()
|
||||
sourceFiles = C.map sourceFile
|
||||
|
||||
sourceFile :: FileReference -> DB File
|
||||
sourceFile FileReference{..} = do
|
||||
mFileContent <- traverse get $ FileContentKey <$> fileReferenceContent
|
||||
fileContent <- if
|
||||
| is (_Just . _Nothing) mFileContent
|
||||
, Just fileContentHash <- fileReferenceContent -- Not a restriction
|
||||
-> maybeT (throwM SourceFilesContentUnavailable) $ do
|
||||
let uploadName = decodeUtf8 . Base64.encodeUnpadded $ ByteArray.convert fileContentHash
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
fmap Just . hoistMaybe <=< runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| C.fold
|
||||
| fmap (fmap fileContentHash) mFileContent /= fmap Just fileReferenceContent
|
||||
-> throwM SourceFilesMismatchedHashes
|
||||
| Just fileContent' <- fileContentContent <$> join mFileContent
|
||||
-> return $ Just fileContent'
|
||||
| otherwise
|
||||
-> return Nothing
|
||||
sourceFile :: FileReference -> DBFile
|
||||
sourceFile FileReference{..} = File
|
||||
{ fileTitle = fileReferenceTitle
|
||||
, fileModified = fileReferenceModified
|
||||
, fileContent = toFileContent <$> fileReferenceContent
|
||||
}
|
||||
where
|
||||
toFileContent fileReference
|
||||
| fileReference == $$(liftTyped $ FileContentReference $$(emptyHash))
|
||||
= return ()
|
||||
toFileContent fileReference = do
|
||||
inDB <- lift . E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference
|
||||
if
|
||||
| inDB -> do
|
||||
dbChunksize <- getsYesod $ view _appFileUploadDBChunksize
|
||||
let retrieveChunk chunkHash = \case
|
||||
Nothing -> return Nothing
|
||||
Just start -> do
|
||||
chunk <- E.selectMaybe . E.from $ \fileContentChunk -> do
|
||||
E.where_ $ fileContentChunk E.^. FileContentChunkId E.==. E.val chunkHash
|
||||
return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize)
|
||||
case chunk of
|
||||
Nothing -> throwM SourceFilesContentUnavailable
|
||||
Just (E.Value c) -> return . Just . (c, ) $ if
|
||||
| olength c >= dbChunksize -> Just $ start + dbChunksize
|
||||
| otherwise -> Nothing
|
||||
chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference
|
||||
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ]
|
||||
return $ fileContentEntry E.^. FileContentEntryChunkHash
|
||||
chunkHashes .| C.map E.unValue .| awaitForever (\chunkHash -> C.unfoldM (retrieveChunk chunkHash) $ Just (1 :: Int))
|
||||
| otherwise -> do
|
||||
chunkVar <- newEmptyTMVarIO
|
||||
minioAsync <- lift . allocateLinkedAsync $
|
||||
maybeT (throwM SourceFilesContentUnavailable) $ do
|
||||
let uploadName = minioFileReference # fileReference
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
hoistMaybe <=< runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar . Just)
|
||||
atomically $ putTMVar chunkVar Nothing
|
||||
let go = do
|
||||
mChunk <- atomically $ readTMVar chunkVar
|
||||
case mChunk of
|
||||
Nothing -> waitAsync minioAsync
|
||||
Just chunk -> yield chunk >> go
|
||||
in go
|
||||
|
||||
return File
|
||||
{ fileTitle = fileReferenceTitle
|
||||
, fileContent
|
||||
, fileModified = fileReferenceModified
|
||||
}
|
||||
sourceFiles' :: forall file m. (HasFileReference file, Monad m) => ConduitT file DBFile m ()
|
||||
sourceFiles' = C.map sourceFile'
|
||||
|
||||
sourceFiles' :: forall file. HasFileReference file => ConduitT file File (YesodDB UniWorX) ()
|
||||
sourceFiles' = C.mapM sourceFile'
|
||||
|
||||
sourceFile' :: forall file. HasFileReference file => file -> DB File
|
||||
sourceFile' :: forall file. HasFileReference file => file -> DBFile
|
||||
sourceFile' = sourceFile . view (_FileReference . _1)
|
||||
|
||||
@ -32,7 +32,7 @@ import Yesod.Form.Bootstrap3
|
||||
|
||||
import Handler.Utils.Zip
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import qualified Data.Conduit.List as C (mapMaybe)
|
||||
import qualified Data.Conduit.List as C (mapMaybe, mapMaybeM)
|
||||
|
||||
import qualified Database.Esqueleto as E
|
||||
import qualified Database.Esqueleto.Utils as E
|
||||
@ -831,7 +831,10 @@ pseudonymWordField = checkMMap doCheck id $ ciField & addDatalist (return $ mkOp
|
||||
|
||||
|
||||
uploadContents :: (MonadHandler m, HandlerSite m ~ UniWorX) => ConduitT FileReference ByteString m ()
|
||||
uploadContents = transPipe (liftHandler . runDB) sourceFiles .| C.mapMaybe fileContent
|
||||
uploadContents = transPipe (liftHandler . runDB) sourceFiles .| C.mapMaybeM fileContent'
|
||||
where fileContent' f = runMaybeT $ do
|
||||
File{fileContent = Just fc} <- return f
|
||||
liftHandler . runDB . runConduit $ fc .| C.fold
|
||||
|
||||
data FileFieldUserOption a = FileFieldUserOption
|
||||
{ fieldOptionForce :: Bool
|
||||
@ -893,11 +896,21 @@ genericFileField mkOpts = Field{..}
|
||||
, Map.filter (views _3 $ (&&) <$> fieldOptionForce <*> fieldOptionDefault) fieldAdditionalFiles
|
||||
]
|
||||
|
||||
handleUpload :: FileField -> Maybe Text -> ConduitT File FileReference (YesodDB UniWorX) ()
|
||||
handleUpload :: FileField -> Maybe Text -> ConduitT (File Handler) FileReference (YesodDB UniWorX) ()
|
||||
handleUpload FileField{fieldMaxFileSize} mIdent
|
||||
= C.filter (\File{..} -> maybe (const True) (>) fieldMaxFileSize $ maybe 0 (fromIntegral . olength) fileContent)
|
||||
.| sinkFiles
|
||||
.| C.mapM mkSessionFile
|
||||
= C.map (transFile liftHandler)
|
||||
.| C.mapMaybeM (\f@File{..} -> maybeT (return $ Just f) $ do
|
||||
maxSize <- fromIntegral <$> hoistMaybe fieldMaxFileSize
|
||||
fc <- hoistMaybe fileContent
|
||||
let peekNE n = do
|
||||
str <- C.takeE n .| C.fold
|
||||
leftover str
|
||||
yield str
|
||||
(unsealConduitT -> fc', size) <- lift $ fc $$+ peekNE (succ maxSize) .| C.lengthE
|
||||
return . guardOn (size <= maxSize) $ f { fileContent = Just fc' }
|
||||
)
|
||||
.| sinkFiles
|
||||
.| C.mapM mkSessionFile
|
||||
where
|
||||
mkSessionFile fRef@FileReference{..} = fRef <$ do
|
||||
now <- liftIO getCurrentTime
|
||||
@ -924,7 +937,7 @@ genericFileField mkOpts = Field{..}
|
||||
doUnpack
|
||||
| fieldOptionForce fieldUnpackZips = fieldOptionDefault fieldUnpackZips
|
||||
| otherwise = unpackZips `elem` vals
|
||||
handleFile :: FileInfo -> ConduitT () File Handler ()
|
||||
handleFile :: FileInfo -> ConduitT () (File Handler) Handler ()
|
||||
handleFile
|
||||
| doUnpack = receiveFiles
|
||||
| otherwise = yieldM . acceptFile
|
||||
|
||||
@ -12,9 +12,7 @@ import Handler.Utils.Files
|
||||
|
||||
import qualified Data.CaseInsensitive as CI
|
||||
|
||||
import qualified Data.ByteString.Lazy as LBS
|
||||
|
||||
import qualified Data.Conduit.List as C
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
import qualified Text.Pandoc as P
|
||||
|
||||
@ -72,12 +70,13 @@ addFileDB :: ( MonadMail m
|
||||
, HandlerSite m ~ UniWorX
|
||||
) => FileReference -> m (Maybe MailObjectId)
|
||||
addFileDB fRef = runMaybeT $ do
|
||||
File{fileTitle = pack . takeBaseName -> fileName, fileContent = Just fileContent} <- lift . liftHandler . runDB $ sourceFile fRef
|
||||
File{fileTitle = pack . takeBaseName -> fileName, fileContent = Just fileContent'} <- return $ sourceFile fRef
|
||||
fileContent <- liftHandler . runDB . runConduit $ fileContent' .| C.sinkLazy
|
||||
lift . addPart $ do
|
||||
_partType .= decodeUtf8 (mimeLookup fileName)
|
||||
_partEncoding .= Base64
|
||||
_partDisposition .= AttachmentDisposition fileName
|
||||
_partContent .= PartContent (LBS.fromStrict fileContent)
|
||||
_partContent .= PartContent fileContent
|
||||
setMailObjectIdPseudorandom (fileName, fileContent) :: StateT Part (HandlerFor UniWorX) MailObjectId
|
||||
|
||||
|
||||
|
||||
@ -16,11 +16,9 @@ import Handler.Utils.DateTime (getDateTimeFormatter)
|
||||
|
||||
import qualified Data.Text as Text
|
||||
|
||||
import qualified Data.ByteString.Lazy as Lazy.ByteString
|
||||
|
||||
import qualified Database.Esqueleto as E
|
||||
|
||||
import qualified Data.Conduit.List as Conduit
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
import Handler.Utils.Rating.Format
|
||||
|
||||
@ -91,15 +89,16 @@ extensionRating = "txt"
|
||||
|
||||
ratingFile :: ( MonadHandler m
|
||||
, HandlerSite m ~ UniWorX
|
||||
, Monad m'
|
||||
)
|
||||
=> CryptoFileNameSubmission -> Rating -> m File
|
||||
=> CryptoFileNameSubmission -> Rating -> m (File m')
|
||||
ratingFile cID rating@Rating{ ratingValues = Rating'{..} } = do
|
||||
mr'@(MsgRenderer mr) <- getMsgRenderer
|
||||
dtFmt <- getDateTimeFormatter
|
||||
fileModified <- maybe (liftIO getCurrentTime) return ratingTime
|
||||
let
|
||||
fileTitle = ensureExtension extensionRating . unpack . mr $ MsgRatingFileTitle cID
|
||||
fileContent = Just . Lazy.ByteString.toStrict $ formatRating mr' dtFmt cID rating
|
||||
fileContent = Just . C.sourceLazy $ formatRating mr' dtFmt cID rating
|
||||
return File{..}
|
||||
|
||||
type SubmissionContent = Either FileReference (SubmissionId, Rating')
|
||||
@ -107,13 +106,12 @@ type SubmissionContent = Either FileReference (SubmissionId, Rating')
|
||||
extractRatings :: ( MonadHandler m
|
||||
, HandlerSite m ~ UniWorX
|
||||
) => ConduitT FileReference SubmissionContent m ()
|
||||
extractRatings = Conduit.mapM $ \fRef@FileReference{..} -> liftHandler $ do
|
||||
extractRatings = C.mapM $ \fRef@FileReference{..} -> liftHandler $ do
|
||||
msId <- isRatingFile fileReferenceTitle
|
||||
if
|
||||
| Just sId <- msId
|
||||
, isJust fileReferenceContent -> do
|
||||
f <- runDB $ sourceFile fRef
|
||||
(rating, cID) <- handle (throwM . RatingFileException fileReferenceTitle) $ parseRating f
|
||||
(rating, cID) <- handle (throwM . RatingFileException fileReferenceTitle) . runDB . parseRating $ sourceFile fRef
|
||||
sId' <- traverse decrypt cID
|
||||
unless (maybe (const True) (==) sId' sId) $
|
||||
throwM $ RatingFileException fileReferenceTitle RatingSubmissionIDIncorrect
|
||||
|
||||
@ -35,6 +35,8 @@ import qualified System.FilePath.Cryptographic as Explicit
|
||||
|
||||
import Control.Exception (ErrorCall(..))
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
|
||||
data PrettifyState
|
||||
= PrettifyInitial
|
||||
@ -195,8 +197,9 @@ instance ns ~ CryptoIDNamespace (CI FilePath) SubmissionId => YAML.FromYAML (May
|
||||
)
|
||||
|
||||
|
||||
parseRating :: MonadCatch m => File -> m (Rating', Maybe CryptoFileNameSubmission)
|
||||
parseRating f@File{ fileContent = Just (fromStrict -> input), .. } = handle onFailure . handle (throwM . RatingParseException) . handleIf isYAMLUnicodeError (\(ErrorCall msg) -> throwM $ RatingYAMLNotUnicode msg) $ do
|
||||
parseRating :: MonadCatch m => File m -> m (Rating', Maybe CryptoFileNameSubmission)
|
||||
parseRating f@File{ fileContent = Just input', .. } = handle onFailure . handle (throwM . RatingParseException) . handleIf isYAMLUnicodeError (\(ErrorCall msg) -> throwM $ RatingYAMLNotUnicode msg) $ do
|
||||
input <- runConduit $ input' .| C.sinkLazy
|
||||
let evStream = YAML.Event.parseEvents input
|
||||
delimitDocument = do
|
||||
ev <- maybe (throwM RatingYAMLStreamTerminatedUnexpectedly) return =<< await
|
||||
|
||||
@ -16,6 +16,8 @@ import qualified Data.ByteString.Lazy as Lazy (ByteString)
|
||||
|
||||
import qualified Data.CaseInsensitive as CI
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
|
||||
import Text.Read (readEither)
|
||||
|
||||
@ -55,9 +57,9 @@ formatRating cID Rating{ ratingValues = Rating'{..}, ..} = let
|
||||
]
|
||||
in Lazy.Text.encodeUtf8 . (<> "\n") $ displayT doc
|
||||
|
||||
parseRating :: MonadCatch m => File -> m Rating'
|
||||
parseRating :: MonadCatch m => File m -> m Rating'
|
||||
parseRating File{ fileContent = Just input, .. } = handle (throwM . RatingParseLegacyException) $ do
|
||||
inputText <- either (throwM . RatingNotUnicode) return $ Text.decodeUtf8' input
|
||||
inputText <- either (throwM . RatingNotUnicode) return . Text.decodeUtf8' =<< runConduit (input .| C.fold)
|
||||
let
|
||||
(headerLines', commentLines) = break (commentSep `Text.isInfixOf`) $ Text.lines inputText
|
||||
(reverse -> ratingLines, reverse -> _headerLines) = break (sep' `Text.isInfixOf`) $ reverse headerLines'
|
||||
|
||||
@ -256,7 +256,7 @@ planSubmissions sid restriction = do
|
||||
maximumsBy f xs = flip Set.filter xs $ \x -> maybe True (((==) `on` f) x . maximumBy (comparing f)) $ fromNullable xs
|
||||
|
||||
|
||||
submissionFileSource :: SubmissionId -> ConduitT () File (YesodDB UniWorX) ()
|
||||
submissionFileSource :: SubmissionId -> ConduitT () DBFile (YesodDB UniWorX) ()
|
||||
submissionFileSource subId = E.selectSource (E.from $ submissionFileQuery subId)
|
||||
.| C.map entityVal
|
||||
.| sourceFiles'
|
||||
@ -319,7 +319,7 @@ submissionMultiArchive anonymous (Set.toList -> ids) = do
|
||||
setContentDisposition' $ Just ((addExtension `on` unpack) (mr archiveName) extensionZip)
|
||||
respondSource typeZip . (<* lift cleanup) . transPipe (runDBRunner dbrunner) $ do
|
||||
let
|
||||
fileEntitySource' :: (Rating, Entity Submission, Maybe UTCTime, (SheetName,CourseShorthand,SchoolId,TermId,Bool)) -> ConduitT () File (YesodDB UniWorX) ()
|
||||
fileEntitySource' :: (Rating, Entity Submission, Maybe UTCTime, (SheetName,CourseShorthand,SchoolId,TermId,Bool)) -> ConduitT () DBFile (YesodDB UniWorX) ()
|
||||
fileEntitySource' (rating, Entity submissionID Submission{}, subTime, (shn,csh,ssh,tid,sheetAnonymous)) = do
|
||||
cID <- encrypt submissionID
|
||||
|
||||
|
||||
@ -17,11 +17,6 @@ import Codec.Archive.Zip.Conduit.Types
|
||||
import Codec.Archive.Zip.Conduit.UnZip
|
||||
import Codec.Archive.Zip.Conduit.Zip
|
||||
|
||||
-- import qualified Data.ByteString.Lazy as Lazy (ByteString)
|
||||
import qualified Data.ByteString.Lazy as Lazy.ByteString
|
||||
|
||||
import qualified Data.ByteString as ByteString
|
||||
|
||||
import System.FilePath
|
||||
import Data.Time.LocalTime (localTimeToUTC, utcToLocalTime)
|
||||
|
||||
@ -38,6 +33,8 @@ import Data.Encoding ( decodeStrictByteStringExplicit
|
||||
import Data.Encoding.CP437
|
||||
import qualified Data.Char as Char
|
||||
|
||||
import Control.Monad.Trans.Cont
|
||||
|
||||
|
||||
typeZip :: ContentType
|
||||
typeZip = "application/zip"
|
||||
@ -53,49 +50,54 @@ instance Default ZipInfo where
|
||||
}
|
||||
|
||||
|
||||
consumeZip :: forall b m.
|
||||
consumeZip :: forall b m m'.
|
||||
( MonadThrow b
|
||||
, MonadThrow m
|
||||
, MonadBase b m
|
||||
, PrimMonad b
|
||||
, MonadUnliftIO m
|
||||
, MonadResource m
|
||||
, MonadIO m'
|
||||
)
|
||||
=> ConduitT ByteString File m ZipInfo
|
||||
consumeZip = transPipe liftBase unZipStream `fuseUpstream` consumeZip'
|
||||
where
|
||||
consumeZip' :: ConduitT (Either ZipEntry ByteString) File m ()
|
||||
consumeZip' = do
|
||||
input <- await
|
||||
case input of
|
||||
Nothing -> return ()
|
||||
Just (Right _) -> throwM $ userError "Data chunk in unexpected place when parsing ZIP"
|
||||
Just (Left ZipEntry{..}) -> do
|
||||
contentChunks <- toConsumer accContents
|
||||
zipEntryName' <- decodeZipEntryName zipEntryName
|
||||
let
|
||||
fileTitle = dropWhile isPathSeparator . dropTrailingPathSeparator . normalise $ makeValid zipEntryName'
|
||||
fileModified = localTimeToUTC utc zipEntryTime
|
||||
fileContent
|
||||
| hasTrailingPathSeparator zipEntryName' = Nothing
|
||||
| otherwise = Just $ mconcat contentChunks
|
||||
yield File{..}
|
||||
consumeZip'
|
||||
accContents :: ConduitT (Either a b') Void m [b']
|
||||
accContents = do
|
||||
input <- await
|
||||
case input of
|
||||
Just (Right x) -> (x :) <$> accContents
|
||||
Just (Left x) -> [] <$ leftover (Left x)
|
||||
_ -> return []
|
||||
=> ConduitT () ByteString m () -> ConduitT () (File m') m ZipInfo
|
||||
consumeZip inpBS = do
|
||||
inps <- liftIO newBroadcastTMChanIO
|
||||
let feedSingle inp = atomically $ do
|
||||
guardM $ isEmptyTMChan inps
|
||||
writeTMChan inps inp
|
||||
zipAsync <- lift . allocateLinkedAsync . runConduit $ do
|
||||
zipInfo <- (inpBS .| transPipe liftBase unZipStream) `fuseUpstream` C.mapM_ feedSingle
|
||||
atomically $ closeTMChan inps
|
||||
return zipInfo
|
||||
|
||||
produceZip :: forall b m.
|
||||
( MonadThrow b
|
||||
, MonadThrow m
|
||||
, MonadBase b m
|
||||
, PrimMonad b
|
||||
evalContT . callCC $ \finish -> forever $ do
|
||||
(fileChan, fileDef) <- atomically $ do
|
||||
fileChan <- dupTMChan inps
|
||||
fileDef <- readTMChan fileChan
|
||||
return (fileChan, fileDef)
|
||||
case fileDef of
|
||||
Nothing -> finish =<< waitAsync zipAsync
|
||||
Just (Right _) -> return ()
|
||||
Just (Left ZipEntry{..}) -> do
|
||||
zipEntryName' <- decodeZipEntryName zipEntryName
|
||||
let
|
||||
fileTitle = dropWhile isPathSeparator . dropTrailingPathSeparator . normalise $ makeValid zipEntryName'
|
||||
fileModified = localTimeToUTC utc zipEntryTime
|
||||
isDirectory = hasTrailingPathSeparator zipEntryName'
|
||||
fileContent
|
||||
| isDirectory = Nothing
|
||||
| otherwise = Just . evalContT . callCC $ \finishContent -> forever $ do
|
||||
nextVal <- atomically $ (preview _Right =<<) <$> readTMChan fileChan
|
||||
maybe (finishContent ()) (lift . yield) nextVal
|
||||
lift $ yield File{..}
|
||||
|
||||
produceZip :: forall m.
|
||||
( MonadThrow m
|
||||
, PrimMonad m
|
||||
)
|
||||
=> ZipInfo
|
||||
-> ConduitT File ByteString m ()
|
||||
produceZip info = C.map toZipData .| transPipe liftBase (void $ zipStream zipOptions)
|
||||
-> ConduitT (File m) ByteString m ()
|
||||
produceZip info = C.map toZipData .| void (zipStream zipOptions)
|
||||
where
|
||||
zipOptions = ZipOptions
|
||||
{ zipOpt64 = True
|
||||
@ -103,13 +105,11 @@ produceZip info = C.map toZipData .| transPipe liftBase (void $ zipStream zipOpt
|
||||
, zipOptInfo = info
|
||||
}
|
||||
|
||||
toZipData :: File -> (ZipEntry, ZipData b)
|
||||
toZipData f@File{..} =
|
||||
let zData = maybe mempty (ZipDataByteString . Lazy.ByteString.fromStrict) fileContent
|
||||
zEntry = (toZipEntry f){ zipEntrySize = fromIntegral . ByteString.length <$> fileContent }
|
||||
in (zEntry, zData)
|
||||
toZipData :: File m -> (ZipEntry, ZipData m)
|
||||
toZipData f@File{..}
|
||||
= (toZipEntry f, ) $ maybe mempty ZipDataSource fileContent
|
||||
|
||||
toZipEntry :: File -> ZipEntry
|
||||
toZipEntry :: File m -> ZipEntry
|
||||
toZipEntry File{..} = ZipEntry{..}
|
||||
where
|
||||
isDir = isNothing fileContent
|
||||
@ -119,26 +119,26 @@ produceZip info = C.map toZipData .| transPipe liftBase (void $ zipStream zipOpt
|
||||
zipEntrySize = Nothing
|
||||
zipEntryExternalAttributes = Nothing
|
||||
|
||||
modifyFileTitle :: Monad m => (FilePath -> FilePath) -> ConduitT File File m ()
|
||||
modifyFileTitle :: Monad m => (FilePath -> FilePath) -> ConduitT (File m') (File m') m ()
|
||||
modifyFileTitle f = mapC $ \x@File{..} -> x{ fileTitle = f fileTitle }
|
||||
|
||||
-- Takes FileInfo and if it is a ZIP-Archive, extract files, otherwiese yield fileinfo
|
||||
receiveFiles :: (MonadLogger m, MonadResource m, MonadThrow m, MonadBase IO m) => FileInfo -> ConduitT () File m ()
|
||||
receiveFiles :: (MonadLogger m, MonadResource m, MonadThrow m, MonadBase IO m, MonadUnliftIO m, MonadResource m') => FileInfo -> ConduitT () (File m') m ()
|
||||
receiveFiles fInfo
|
||||
| ((==) `on` simpleContentType) mimeType typeZip = do
|
||||
$logInfoS "sourceFiles" "Unpacking ZIP"
|
||||
fileSource fInfo .| void consumeZip
|
||||
void . consumeZip $ fileSource fInfo
|
||||
| otherwise = do
|
||||
$logDebugS "sourceFiles" [st|Not unpacking file of type #{decodeUtf8 mimeType}|]
|
||||
yieldM $ acceptFile fInfo
|
||||
where
|
||||
mimeType = mimeLookup $ fileName fInfo
|
||||
|
||||
acceptFile :: MonadResource m => FileInfo -> m File
|
||||
acceptFile :: (MonadResource m, MonadResource m') => FileInfo -> m (File m')
|
||||
acceptFile fInfo = do
|
||||
let fileTitle = dropWhile isPathSeparator . dropTrailingPathSeparator . normalise . unpack $ fileName fInfo
|
||||
fileContent = Just $ fileSource fInfo
|
||||
fileModified <- liftIO getCurrentTime
|
||||
fileContent <- fmap Just . runConduit $ fileSource fInfo .| foldC
|
||||
return File{..}
|
||||
|
||||
|
||||
|
||||
@ -128,6 +128,8 @@ import Data.Proxy as Import (Proxy(..))
|
||||
|
||||
import Data.List.PointedList as Import (PointedList)
|
||||
|
||||
import Language.Haskell.TH.Syntax as Import (Lift(liftTyped))
|
||||
|
||||
import Language.Haskell.TH.Instances as Import ()
|
||||
import Data.NonNull.Instances as Import ()
|
||||
import Data.Monoid.Instances as Import ()
|
||||
|
||||
@ -9,20 +9,15 @@ import Import hiding (matching)
|
||||
import Database.Persist.Sql (deleteWhereCount)
|
||||
|
||||
import qualified Database.Esqueleto as E
|
||||
import qualified Database.Esqueleto.PostgreSQL as E
|
||||
import qualified Database.Esqueleto.Utils as E
|
||||
import qualified Database.Esqueleto.Internal.Sql as E (unsafeSqlCastAs)
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import qualified Data.Conduit.List as C (mapMaybe)
|
||||
import qualified Data.Conduit.List as C (mapMaybe, unfoldM)
|
||||
|
||||
import Handler.Utils.Minio
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
import qualified Crypto.Hash as Crypto
|
||||
import qualified Data.ByteString.Base64.URL as Base64
|
||||
|
||||
import Control.Monad.Memo (startEvalMemoT, memo)
|
||||
|
||||
|
||||
dispatchJobPruneSessionFiles :: JobHandler UniWorX
|
||||
dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do
|
||||
@ -44,6 +39,9 @@ fileReferences (E.just -> fHash)
|
||||
, E.from $ \subFile -> E.where_ $ subFile E.^. SubmissionFileContent E.==. fHash
|
||||
, E.from $ \sessFile -> E.where_ $ sessFile E.^. SessionFileContent E.==. fHash
|
||||
, E.from $ \lock -> E.where_ $ E.just (lock E.^. FileLockContent) E.==. fHash
|
||||
, E.from $ \chunkLock -> E.where_ . E.exists . E.from $ \fileContentEntry ->
|
||||
E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. fHash
|
||||
E.&&. chunkLock E.^. FileChunkLockHash E.==. E.subSelectForeign fileContentEntry FileContentEntryChunkHash (E.^. FileContentChunkHash)
|
||||
]
|
||||
|
||||
|
||||
@ -53,63 +51,111 @@ dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
interval <- fmap (fmap $ max 0) . getsYesod $ view _appPruneUnreferencedFiles
|
||||
keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles
|
||||
|
||||
E.insertSelectWithConflict
|
||||
(UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint")
|
||||
(E.from $ \fileContentChunk -> do
|
||||
E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkId
|
||||
return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash
|
||||
return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now
|
||||
)
|
||||
(\current excluded ->
|
||||
[ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ]
|
||||
)
|
||||
|
||||
E.update $ \fileContent -> do
|
||||
let isReferenced = E.any E.exists . fileReferences $ fileContent E.^. FileContentHash
|
||||
now' = E.unsafeSqlCastAs "TIMESTAMP WITH TIME ZONE" $ E.val now
|
||||
shouldBe = E.bool (E.just . E.maybe now' (E.min now') $ fileContent E.^. FileContentUnreferencedSince) E.nothing isReferenced
|
||||
E.set fileContent [ FileContentUnreferencedSince E.=. shouldBe ]
|
||||
E.delete . E.from $ \fileContentChunkUnreferenced ->
|
||||
E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash
|
||||
|
||||
let
|
||||
getCandidates = E.selectSource . E.from $ \fileContent -> do
|
||||
E.where_ . E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) $ fileContent E.^. FileContentUnreferencedSince
|
||||
return ( fileContent E.^. FileContentHash
|
||||
, E.length_ $ fileContent E.^. FileContentContent
|
||||
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do
|
||||
let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do
|
||||
E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
|
||||
return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince
|
||||
E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince
|
||||
|
||||
E.groupBy $ fileContentEntry E.^. FileContentEntryHash
|
||||
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryHash ]
|
||||
|
||||
return $ fileContentEntry E.^. FileContentEntryHash
|
||||
|
||||
deleteEntry :: _ -> DB (Sum Natural)
|
||||
deleteEntry (E.Value fRef) =
|
||||
bool 0 1 . (> 0) <$> deleteWhereCount [FileContentEntryHash ==. fRef]
|
||||
|
||||
Sum deletedEntries <- runConduit $
|
||||
getEntryCandidates
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 3)) interval
|
||||
.| C.mapM deleteEntry
|
||||
.| C.fold
|
||||
|
||||
when (deletedEntries > 0) $
|
||||
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|]
|
||||
|
||||
let
|
||||
getChunkCandidates = E.selectSource . E.from $ \fileContentChunkUnreferenced -> do
|
||||
E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now)
|
||||
E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry ->
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
|
||||
return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
, E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent)
|
||||
)
|
||||
|
||||
Sum deleted <- runConduit $
|
||||
getCandidates
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
|
||||
deleteChunk :: _ -> DB (Sum Natural, Sum Word64)
|
||||
deleteChunk (E.Value cRef, E.Value size) = do
|
||||
deleteWhere [ FileContentChunkUnreferencedHash ==. cRef ]
|
||||
(, Sum size) . fromIntegral <$> deleteWhereCount [FileContentChunkHash ==. unFileContentChunkKey cRef]
|
||||
|
||||
(Sum deletedChunks, Sum deletedChunkSize) <- runConduit $
|
||||
getChunkCandidates
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 3)) interval
|
||||
.| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64)
|
||||
.| C.map (view $ _1 . _Value)
|
||||
.| C.mapM (\fRef -> Sum <$> deleteWhereCount [FileContentHash ==. fRef])
|
||||
.| C.mapM deleteChunk
|
||||
.| C.fold
|
||||
when (deleted > 0) $
|
||||
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{deleted} long-unreferenced files|]
|
||||
|
||||
when (deletedChunks > 0) $
|
||||
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{tshow deletedChunkSize} bytes)|]
|
||||
|
||||
|
||||
dispatchJobInjectFiles :: JobHandler UniWorX
|
||||
dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
uploadBucket <- getsYesod $ view _appUploadCacheBucket
|
||||
interval <- getsYesod $ view _appInjectFiles
|
||||
now <- liftIO getCurrentTime
|
||||
|
||||
let
|
||||
extractReference (Minio.ListItemObject oi)
|
||||
| Right bs <- Base64.decodeUnpadded . encodeUtf8 $ Minio.oiObject oi
|
||||
, Just fRef <- Crypto.digestFromByteString bs
|
||||
= Just (oi, fRef)
|
||||
extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference
|
||||
extractReference _ = Nothing
|
||||
|
||||
injectOrDelete :: (Minio.Object, FileContentReference)
|
||||
-> Handler (Sum Int64, Sum Int64) -- ^ Injected, Already existed
|
||||
injectOrDelete (obj, fRef) = maybeT (return mempty) $ do
|
||||
res <- hoist (startEvalMemoT . hoistStateCache (runDB . setSerializable)) $ do
|
||||
alreadyInjected <- lift . lift $ exists [ FileContentHash ==. fRef ]
|
||||
if | alreadyInjected -> return (mempty, Sum 1)
|
||||
| otherwise -> do
|
||||
content <- flip memo obj $ \obj' -> hoistMaybeM . runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj' Minio.defaultGetObjectOptions
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| C.fold
|
||||
-> Handler (Sum Int64) -- ^ Injected
|
||||
injectOrDelete (obj, fRef) = do
|
||||
fRef' <- runDB . setSerializable $ do
|
||||
chunkVar <- newEmptyTMVarIO
|
||||
dbAsync <- allocateLinkedAsync $ do
|
||||
atomically $ isEmptyTMVar chunkVar >>= guard . not
|
||||
sinkFileDB $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) ()
|
||||
|
||||
fmap ((, mempty) . Sum) . lift. lift . E.insertSelectCount $
|
||||
let isReferenced = E.any E.exists $ fileReferences (E.val fRef)
|
||||
now' = E.unsafeSqlCastAs "TIMESTAMP WITH TIME ZONE" $ E.val now
|
||||
in return $ FileContent E.<# E.val fRef E.<&> E.val content E.<&> E.bool (E.just now') E.nothing isReferenced
|
||||
runAppMinio . maybeT (return ()) . catchIfMaybeT minioIsDoesNotExist $ Minio.removeObject uploadBucket obj
|
||||
return res
|
||||
didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar . Just)
|
||||
return True
|
||||
if
|
||||
| not didSend -> Nothing <$ cancel dbAsync
|
||||
| otherwise -> do
|
||||
atomically $ putTMVar chunkVar Nothing
|
||||
Just <$> waitAsync dbAsync
|
||||
let matchesFRef = is _Just $ assertM (== fRef) fRef'
|
||||
if | matchesFRef ->
|
||||
maybeT (return ()) . runAppMinio . handleIf minioIsDoesNotExist (const $ return ()) $ Minio.removeObject uploadBucket obj
|
||||
| otherwise ->
|
||||
$logErrorS "InjectFiles" [st|Minio object “#{obj}”'s content does not match it's name (content hash: #{tshow fRef'} /= name hash: #{tshow fRef})|]
|
||||
return . bool mempty (Sum 1) $ is _Just fRef'
|
||||
|
||||
(Sum inj, Sum exc) <-
|
||||
Sum inj <-
|
||||
runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True)
|
||||
.| C.mapMaybe extractReference
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
|
||||
@ -118,7 +164,5 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
.| transPipe lift (C.mapM injectOrDelete)
|
||||
.| C.fold
|
||||
|
||||
when (exc > 0) $
|
||||
$logInfoS "InjectFiles" [st|Deleted #{exc} files from upload cache because they were already injected|]
|
||||
when (inj > 0) $
|
||||
$logInfoS "InjectFiles" [st|Injected #{inj} files from upload cache into database|]
|
||||
|
||||
@ -915,13 +915,26 @@ customMigrations = Map.fromListWith (>>)
|
||||
|
||||
)
|
||||
, ( AppliedMigrationKey [migrationVersion|39.0.0|] [version|40.0.0|]
|
||||
, whenM (tableExists "study_features") $ do
|
||||
, whenM (tableExists "study_features") $
|
||||
[executeQQ|
|
||||
ALTER TABLE study_features RENAME updated TO last_observed;
|
||||
ALTER TABLE study_features ADD COLUMN first_observed timestamp with time zone;
|
||||
UPDATE study_features SET first_observed = (SELECT MAX(last_observed) FROM study_features as other WHERE other."user" = study_features."user" AND other.degree = study_features.degree AND other.field = study_features.field AND other.type = study_features.type AND other.semester = study_features.semester - 1);
|
||||
|]
|
||||
)
|
||||
, ( AppliedMigrationKey [migrationVersion|40.0.0|] [version|41.0.0|]
|
||||
, whenM (tableExists "file_content") $
|
||||
[executeQQ|
|
||||
ALTER TABLE file_content RENAME TO file_content_chunk;
|
||||
|
||||
CREATE TABLE file_content_chunk_unreferenced (id bigserial, hash bytea NOT NULL, since timestamp with time zone NOT NULL);
|
||||
INSERT INTO file_content_chunk_unreferenced (since, hash) (SELECT unreferenced_since as since, hash FROM file_content_chunk WHERE NOT (unreferenced_since IS NULL));
|
||||
ALTER TABLE file_content_chunk_chunk DROP COLUMN unreferenced_since;
|
||||
|
||||
CREATE TABLE file_content_entry (hash bytea NOT NULL, ix bigint NOT NULL, chunk_hash bytea NOT NULL);
|
||||
INSERT INTO file_content_entry (hash, chunk_hash, ix) (SELECT hash, hash as chunk_hash, 0 as ix FROM file_content_chunk);
|
||||
|]
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
|
||||
@ -54,5 +54,3 @@ type InstanceId = UUID
|
||||
type ClusterId = UUID
|
||||
type TokenId = UUID
|
||||
type TermCandidateIncidence = UUID
|
||||
|
||||
type FileContentReference = Digest SHA3_512
|
||||
|
||||
@ -1,23 +1,65 @@
|
||||
module Model.Types.File
|
||||
( File(..), _fileTitle, _fileContent, _fileModified
|
||||
( FileContentChunkReference(..), FileContentReference(..)
|
||||
, File(..), _fileTitle, _fileContent, _fileModified
|
||||
, transFile
|
||||
, minioFileReference
|
||||
, FileReference(..), _fileReferenceTitle, _fileReferenceContent, _fileReferenceModified
|
||||
, HasFileReference(..), IsFileReference(..), FileReferenceResidual(..)
|
||||
) where
|
||||
|
||||
import Import.NoModel
|
||||
import Model.Types.Common (FileContentReference)
|
||||
|
||||
import Database.Persist.Sql (PersistFieldSql)
|
||||
import Web.HttpApiData (ToHttpApiData, FromHttpApiData)
|
||||
import Data.ByteArray (ByteArrayAccess)
|
||||
|
||||
import qualified Data.ByteString.Base64.URL as Base64
|
||||
import qualified Data.ByteArray as ByteArray
|
||||
import qualified Network.Minio as Minio (Object)
|
||||
import qualified Crypto.Hash as Crypto (digestFromByteString)
|
||||
|
||||
import Utils.Lens.TH
|
||||
|
||||
|
||||
data File = File
|
||||
|
||||
newtype FileContentChunkReference = FileContentChunkReference (Digest SHA3_512)
|
||||
deriving (Eq, Ord, Read, Show, Lift, Generic, Typeable)
|
||||
deriving newtype ( PersistField, PersistFieldSql
|
||||
, PathPiece, ToHttpApiData, FromHttpApiData, ToJSON, FromJSON
|
||||
, Hashable, NFData
|
||||
, ByteArrayAccess
|
||||
)
|
||||
|
||||
makeWrapped ''FileContentChunkReference
|
||||
|
||||
newtype FileContentReference = FileContentReference (Digest SHA3_512)
|
||||
deriving (Eq, Ord, Read, Show, Lift, Generic, Typeable)
|
||||
deriving newtype ( PersistField, PersistFieldSql
|
||||
, PathPiece, ToHttpApiData, FromHttpApiData, ToJSON, FromJSON
|
||||
, Hashable, NFData
|
||||
, ByteArrayAccess
|
||||
)
|
||||
|
||||
makeWrapped ''FileContentReference
|
||||
|
||||
|
||||
minioFileReference :: Prism' Minio.Object FileContentReference
|
||||
minioFileReference = prism' toObjectName fromObjectName
|
||||
where toObjectName = decodeUtf8 . Base64.encodeUnpadded . ByteArray.convert
|
||||
fromObjectName = fmap (review _Wrapped) . Crypto.digestFromByteString <=< preview _Right . Base64.decodeUnpadded . encodeUtf8
|
||||
|
||||
|
||||
data File m = File
|
||||
{ fileTitle :: FilePath
|
||||
, fileContent :: Maybe ByteString
|
||||
, fileContent :: Maybe (ConduitT () ByteString m ())
|
||||
, fileModified :: UTCTime
|
||||
} deriving (Eq, Ord, Read, Show, Generic, Typeable)
|
||||
} deriving (Generic, Typeable)
|
||||
|
||||
makeLenses_ ''File
|
||||
|
||||
transFile :: Monad m => (forall a. m a -> n a) -> (File m -> File n)
|
||||
transFile l File{..} = File{ fileContent = transPipe l <$> fileContent, .. }
|
||||
|
||||
data FileReference = FileReference
|
||||
{ fileReferenceTitle :: FilePath
|
||||
, fileReferenceContent :: Maybe FileContentReference
|
||||
|
||||
@ -172,6 +172,7 @@ data AppSettings = AppSettings
|
||||
, appUploadCacheConf :: Maybe Minio.ConnectInfo
|
||||
, appUploadCacheBucket :: Minio.Bucket
|
||||
, appInjectFiles :: Maybe NominalDiffTime
|
||||
, appFileUploadDBChunksize :: Int
|
||||
|
||||
, appFavouritesQuickActionsBurstsize
|
||||
, appFavouritesQuickActionsAvgInverseRate :: Word64
|
||||
@ -474,6 +475,7 @@ instance FromJSON AppSettings where
|
||||
appPruneUnreferencedFiles <- o .:? "prune-unreferenced-files"
|
||||
appKeepUnreferencedFiles <- o .:? "keep-unreferenced-files" .!= 0
|
||||
appInjectFiles <- o .:? "inject-files"
|
||||
appFileUploadDBChunksize <- o .: "file-upload-db-chunksize"
|
||||
|
||||
appMaximumContentLength <- o .: "maximum-content-length"
|
||||
|
||||
|
||||
27
src/Utils.hs
27
src/Utils.hs
@ -56,7 +56,8 @@ import Control.Arrow as Utils ((>>>))
|
||||
import Control.Monad.Trans.Except (ExceptT(..), throwE, runExceptT)
|
||||
import Control.Monad.Except (MonadError(..))
|
||||
import Control.Monad.Trans.Maybe as Utils (MaybeT(..))
|
||||
import Control.Monad.Trans.Writer.Lazy (WriterT, execWriterT, tell)
|
||||
import Control.Monad.Trans.Writer.Strict (execWriterT)
|
||||
import Control.Monad.Writer.Class (MonadWriter(..))
|
||||
import Control.Monad.Catch
|
||||
import Control.Monad.Morph (hoist)
|
||||
import Control.Monad.Fail
|
||||
@ -83,6 +84,9 @@ import qualified Crypto.Saltine.Class as Saltine
|
||||
import qualified Crypto.Data.PKCS7 as PKCS7
|
||||
import Crypto.MAC.KMAC (KMAC, HashSHAKE)
|
||||
import qualified Crypto.MAC.KMAC as KMAC
|
||||
import qualified Crypto.Hash as Crypto
|
||||
import Crypto.Hash (HashAlgorithm, Digest)
|
||||
import Crypto.Hash.Instances ()
|
||||
|
||||
import Data.ByteArray (ByteArrayAccess)
|
||||
|
||||
@ -843,7 +847,7 @@ diffTimeout timeoutLength timeoutRes act = fromMaybe timeoutRes <$> timeout time
|
||||
= let (MkFixed micro :: Micro) = realToFrac timeoutLength
|
||||
in fromInteger micro
|
||||
|
||||
tellM :: (Monad m, Monoid x) => m x -> WriterT x m ()
|
||||
tellM :: (MonadTrans t, MonadWriter x (t m), Monad m) => m x -> t m ()
|
||||
tellM = tell <=< lift
|
||||
|
||||
-------------
|
||||
@ -856,6 +860,19 @@ peekN n = do
|
||||
mapM_ leftover peeked
|
||||
return peeked
|
||||
|
||||
peekWhile :: forall a o m. Monad m => (a -> Bool) -> ConduitT a o m [a]
|
||||
peekWhile p = do
|
||||
let go acc = do
|
||||
next <- await
|
||||
case next of
|
||||
Nothing -> return (reverse acc, Nothing)
|
||||
Just x
|
||||
| p x -> go $ x : acc
|
||||
| otherwise -> return (reverse acc, Just x)
|
||||
(peeked, failed) <- go []
|
||||
mapM_ leftover $ peeked ++ hoistMaybe failed
|
||||
return peeked
|
||||
|
||||
anyMC, allMC :: forall a o m. Monad m => (a -> m Bool) -> ConduitT a o m Bool
|
||||
anyMC f = C.mapM f .| orC
|
||||
allMC f = C.mapM f .| andC
|
||||
@ -1057,6 +1074,12 @@ kmaclazy :: forall a string key ba chunk.
|
||||
-> KMAC a
|
||||
kmaclazy str k = KMAC.finalize . KMAC.updates (KMAC.initialize @a str k) . toChunks
|
||||
|
||||
emptyHash :: forall a. HashAlgorithm a => Q (TExp (Digest a))
|
||||
-- ^ Hash of `mempty`
|
||||
--
|
||||
-- Computationally preferrable to computing the hash at runtime
|
||||
emptyHash = TH.liftTyped $ Crypto.hashFinalize Crypto.hashInit
|
||||
|
||||
-------------
|
||||
-- Caching --
|
||||
-------------
|
||||
|
||||
@ -3,6 +3,7 @@ module Utils.Files
|
||||
, sinkFile', sinkFiles'
|
||||
, FileUploads
|
||||
, replaceFileReferences, replaceFileReferences'
|
||||
, sinkFileDB
|
||||
) where
|
||||
|
||||
import Import.NoFoundation
|
||||
@ -11,31 +12,46 @@ import Handler.Utils.Minio
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
import qualified Crypto.Hash as Crypto (hash)
|
||||
import qualified Crypto.Hash.Conduit as Crypto (sinkHash)
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
import qualified Data.ByteString.Base64.URL as Base64
|
||||
import qualified Data.ByteArray as ByteArray
|
||||
import qualified Data.Conduit.List as C (unfoldM)
|
||||
|
||||
import qualified Data.Map.Lazy as Map
|
||||
import qualified Data.Set as Set
|
||||
import Control.Monad.State.Class (modify)
|
||||
|
||||
import qualified Data.Sequence as Seq
|
||||
|
||||
import Database.Persist.Sql (deleteWhereCount)
|
||||
|
||||
import Control.Monad.Trans.Resource (allocate)
|
||||
|
||||
import qualified Data.UUID.V4 as UUID
|
||||
|
||||
sinkFiles :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT File FileReference (SqlPersistT m) ()
|
||||
sinkFiles = C.mapM sinkFile
|
||||
import qualified Database.Esqueleto as E
|
||||
import qualified Database.Esqueleto.Utils as E
|
||||
|
||||
|
||||
sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX)
|
||||
=> ConduitT () ByteString (SqlPersistT m) () -> SqlPersistT m FileContentReference
|
||||
sinkFileDB fileContentContent = do
|
||||
dbChunksize <- getsYesod $ view _appFileUploadDBChunksize
|
||||
|
||||
let sinkChunk fileContentChunkContent = do
|
||||
fileChunkLockTime <- liftIO getCurrentTime
|
||||
fileChunkLockInstance <- getsYesod appInstanceID
|
||||
|
||||
tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. }
|
||||
lift . handleIfSql isUniqueConstraintViolation (const $ return ()) $
|
||||
insert_ FileContentChunk{..}
|
||||
return $ FileContentChunkKey fileContentChunkHash
|
||||
where fileContentChunkHash = _Wrapped # Crypto.hash fileContentChunkContent
|
||||
((review _Wrapped -> fileContentHash, fileContentChunks), chunkLocks) <- runConduit . runWriterC $ transPipe lift fileContentContent
|
||||
.| C.chunksOfE dbChunksize
|
||||
.| C.mapM (\c -> (c, ) <$> sinkChunk c)
|
||||
.| transPipe lift (getZipConduit $ (,) <$> ZipConduit (C.map (view _1) .| Crypto.sinkHash) <*> ZipConduit (C.foldMap $ views _2 Seq.singleton))
|
||||
|
||||
sinkFile :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => File -> SqlPersistT m FileReference
|
||||
sinkFile File{ fileContent = Nothing, .. } = return FileReference
|
||||
{ fileReferenceContent = Nothing
|
||||
, fileReferenceTitle = fileTitle
|
||||
, fileReferenceModified = fileModified
|
||||
}
|
||||
sinkFile File{ fileContent = Just fileContentContent, .. } = do
|
||||
void . withUnliftIO $ \UnliftIO{..} ->
|
||||
let takeLock = do
|
||||
fileLockTime <- liftIO getCurrentTime
|
||||
@ -44,35 +60,81 @@ sinkFile File{ fileContent = Just fileContentContent, .. } = do
|
||||
releaseLock lId = liftHandler . runDB $ (withReaderT projectBackend $ setSerializable (delete lId :: SqlPersistT (HandlerFor UniWorX) ()) :: YesodDB UniWorX ())
|
||||
in unliftIO $ allocate (unliftIO takeLock) (unliftIO . releaseLock)
|
||||
|
||||
inDB <- exists [ FileContentHash ==. fileContentHash ]
|
||||
deleteWhere [ FileChunkLockId <-. Set.toList chunkLocks ]
|
||||
|
||||
let sinkFileDB = unless inDB $ repsert (FileContentKey fileContentHash) FileContent{ fileContentUnreferencedSince = Nothing, .. }
|
||||
maybeT sinkFileDB $ do
|
||||
let uploadName = decodeUtf8 . Base64.encodeUnpadded $ ByteArray.convert fileContentHash
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
unless inDB . runAppMinio $ do
|
||||
uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket uploadName Minio.defaultGetObjectOptions
|
||||
unless uploadExists $ do
|
||||
let
|
||||
pooOptions = Minio.defaultPutObjectOptions
|
||||
{ Minio.pooCacheControl = Just "immutable"
|
||||
}
|
||||
Minio.putObject uploadBucket uploadName (C.sourceLazy $ fromStrict fileContentContent) (Just . fromIntegral $ olength fileContentContent) pooOptions
|
||||
-- Note that MinIO does not accept length zero uploads without an explicit length specification (not `Nothing` in the line above for the api we use)
|
||||
let entryExists = E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileContentHash
|
||||
unlessM entryExists . void $
|
||||
insertMany_ [ FileContentEntry{ fileContentEntryHash = fileContentHash, .. }
|
||||
| fileContentEntryChunkHash <- otoList fileContentChunks
|
||||
| fileContentEntryIx <- [0..]
|
||||
]
|
||||
|
||||
return fileContentHash
|
||||
where fileContentChunkContentBased = False
|
||||
|
||||
|
||||
sinkFiles :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT (File (SqlPersistT m)) FileReference (SqlPersistT m) ()
|
||||
sinkFiles = C.mapM sinkFile
|
||||
|
||||
sinkFile :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => File (SqlPersistT m) -> SqlPersistT m FileReference
|
||||
sinkFile File{ fileContent = Nothing, .. } = return FileReference
|
||||
{ fileReferenceContent = Nothing
|
||||
, fileReferenceTitle = fileTitle
|
||||
, fileReferenceModified = fileModified
|
||||
}
|
||||
sinkFile File{ fileContent = Just fileContentContent, .. } = do
|
||||
(unsealConduitT -> fileContentContent', isEmpty) <- fileContentContent $$+ is _Nothing <$> C.peekE
|
||||
|
||||
fileContentHash <- if
|
||||
| not isEmpty -> maybeT (sinkFileDB fileContentContent') $ do
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
chunk <- liftIO newEmptyMVar
|
||||
let putChunks = do
|
||||
nextChunk <- await
|
||||
case nextChunk of
|
||||
Nothing
|
||||
-> putMVar chunk Nothing
|
||||
Just nextChunk'
|
||||
-> putMVar chunk (Just nextChunk') >> yield nextChunk' >> putChunks
|
||||
sinkAsync <- lift . allocateLinkedAsync . runConduit
|
||||
$ fileContentContent'
|
||||
.| putChunks
|
||||
.| Crypto.sinkHash
|
||||
|
||||
runAppMinio $ do
|
||||
tmpUUID <- liftIO UUID.nextRandom
|
||||
let uploadName = ".tmp." <> toPathPiece tmpUUID
|
||||
pooOptions = Minio.defaultPutObjectOptions
|
||||
{ Minio.pooCacheControl = Just "immutable"
|
||||
}
|
||||
Minio.putObject uploadBucket uploadName (C.unfoldM (\x -> fmap (, x) <$> takeMVar chunk) ()) Nothing pooOptions
|
||||
fileContentHash <- review _Wrapped <$> waitAsync sinkAsync
|
||||
let dstName = minioFileReference # fileContentHash
|
||||
copySrc = Minio.defaultSourceInfo
|
||||
{ Minio.srcBucket = uploadBucket, Minio.srcObject = uploadName
|
||||
}
|
||||
copyDst = Minio.defaultDestinationInfo
|
||||
{ Minio.dstBucket = uploadBucket
|
||||
, Minio.dstObject = dstName
|
||||
}
|
||||
uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket dstName Minio.defaultGetObjectOptions
|
||||
unless uploadExists $
|
||||
Minio.copyObject copyDst copySrc
|
||||
Minio.removeObject uploadBucket uploadName
|
||||
return fileContentHash
|
||||
| otherwise -> return $$(liftTyped $ FileContentReference $$(emptyHash))
|
||||
|
||||
return FileReference
|
||||
{ fileReferenceContent = Just fileContentHash
|
||||
, fileReferenceTitle = fileTitle
|
||||
, fileReferenceModified = fileModified
|
||||
}
|
||||
where
|
||||
fileContentHash = Crypto.hash fileContentContent
|
||||
|
||||
|
||||
sinkFiles' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => ConduitT (File, FileReferenceResidual record) record (SqlPersistT m) ()
|
||||
sinkFiles' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => ConduitT (File (SqlPersistT m), FileReferenceResidual record) record (SqlPersistT m) ()
|
||||
sinkFiles' = C.mapM $ uncurry sinkFile'
|
||||
|
||||
sinkFile' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => File -> FileReferenceResidual record -> SqlPersistT m record
|
||||
sinkFile' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => File (SqlPersistT m) -> FileReferenceResidual record -> SqlPersistT m record
|
||||
sinkFile' file residual = do
|
||||
reference <- sinkFile file
|
||||
return $ _FileReference # (reference, residual)
|
||||
|
||||
@ -1,22 +1,32 @@
|
||||
module Utils.Sql
|
||||
( setSerializable, setSerializable'
|
||||
, catchSql, handleSql
|
||||
, isUniqueConstraintViolation
|
||||
, catchIfSql, handleIfSql
|
||||
) where
|
||||
|
||||
import ClassyPrelude.Yesod
|
||||
import ClassyPrelude.Yesod hiding (handle)
|
||||
import Numeric.Natural
|
||||
import Settings.Log
|
||||
|
||||
import Database.PostgreSQL.Simple (SqlError)
|
||||
import Database.PostgreSQL.Simple (SqlError(..))
|
||||
import Database.PostgreSQL.Simple.Errors (isSerializationError)
|
||||
import Control.Monad.Catch (MonadMask)
|
||||
import Control.Monad.Catch
|
||||
|
||||
import Database.Persist.Sql
|
||||
import Database.Persist.Sql.Raw.QQ
|
||||
|
||||
import qualified Data.ByteString as ByteString
|
||||
|
||||
import Control.Retry
|
||||
|
||||
import Control.Lens ((&))
|
||||
|
||||
import qualified Data.UUID as UUID
|
||||
import Control.Monad.Random.Class (MonadRandom(getRandom))
|
||||
|
||||
import Text.Shakespeare.Text (st)
|
||||
|
||||
|
||||
setSerializable :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a
|
||||
setSerializable = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6
|
||||
@ -54,5 +64,29 @@ setSerializable' policy act = do
|
||||
transactionSaveWithIsolation ReadCommitted
|
||||
return res
|
||||
|
||||
catchSql :: forall m a. (MonadCatch m, MonadIO m) => SqlPersistT m a -> (SqlError -> SqlPersistT m a) -> SqlPersistT m a
|
||||
catchSql = flip handleSql
|
||||
|
||||
handleSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> SqlPersistT m a) -> SqlPersistT m a -> SqlPersistT m a
|
||||
handleSql recover act = do
|
||||
savepointName <- liftIO $ UUID.toString <$> getRandom
|
||||
|
||||
let recover' :: SqlError -> SqlPersistT m a
|
||||
recover' exc = do
|
||||
rawExecute [st|ROLLBACK TO SAVEPOINT "#{savepointName}"|] []
|
||||
recover exc
|
||||
|
||||
handle recover' $ do
|
||||
rawExecute [st|SAVEPOINT "#{savepointName}"|] []
|
||||
res <- act
|
||||
rawExecute [st|RELEASE SAVEPOINT "#{savepointName}"|] []
|
||||
return res
|
||||
|
||||
catchIfSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> Bool) -> SqlPersistT m a -> (SqlError -> SqlPersistT m a) -> SqlPersistT m a
|
||||
catchIfSql p = flip $ handleIfSql p
|
||||
|
||||
handleIfSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> Bool) -> (SqlError -> SqlPersistT m a) -> SqlPersistT m a -> SqlPersistT m a
|
||||
handleIfSql p recover = handleSql (\err -> bool throwM recover (p err) err)
|
||||
|
||||
isUniqueConstraintViolation :: SqlError -> Bool
|
||||
isUniqueConstraintViolation SqlError{..} = "duplicate key value violates unique constraint" `ByteString.isPrefixOf` sqlErrorMsg
|
||||
|
||||
@ -5,7 +5,6 @@ module Database.Fill
|
||||
import "uniworx" Import hiding (Option(..), currentYear)
|
||||
import Handler.Utils.Form (SheetGrading'(..), SheetType'(..), SheetGroup'(..))
|
||||
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.ByteString.Lazy as LBS
|
||||
import qualified Data.Text as Text
|
||||
-- import Data.Text.IO (hPutStrLn)
|
||||
@ -30,6 +29,8 @@ import qualified Data.Csv as Csv
|
||||
import Crypto.Random (getRandomBytes)
|
||||
import Data.List (genericLength)
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
|
||||
testdataDir :: FilePath
|
||||
testdataDir = "testdata"
|
||||
@ -37,7 +38,7 @@ testdataDir = "testdata"
|
||||
|
||||
insertFile :: ( HasFileReference fRef, PersistRecordBackend fRef SqlBackend ) => FileReferenceResidual fRef -> FilePath -> DB (Key fRef)
|
||||
insertFile residual fileTitle = do
|
||||
fileContent <- liftIO . fmap Just . BS.readFile $ testdataDir </> fileTitle
|
||||
let fileContent = Just . C.sourceFile $ testdataDir </> fileTitle
|
||||
fileModified <- liftIO getCurrentTime
|
||||
sinkFile' File{..} residual >>= insert
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user