Merge branch '631-content-dependent-chunking' into master
This commit is contained in:
commit
7d559cc0f1
@ -35,7 +35,8 @@ bearer-expiration: 604800
|
||||
bearer-encoding: HS256
|
||||
maximum-content-length: "_env:MAX_UPLOAD_SIZE:134217728"
|
||||
session-files-expire: 3600
|
||||
prune-unreferenced-files: 28800
|
||||
prune-unreferenced-files-within: 57600
|
||||
prune-unreferenced-files-interval: 3600
|
||||
keep-unreferenced-files: 86400
|
||||
health-check-interval:
|
||||
matching-cluster-config: "_env:HEALTHCHECK_INTERVAL_MATCHING_CLUSTER_CONFIG:600"
|
||||
@ -158,7 +159,13 @@ upload-cache:
|
||||
auto-discover-region: "_env:UPLOAD_S3_AUTO_DISCOVER_REGION:true"
|
||||
disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false"
|
||||
upload-cache-bucket: "uni2work-uploads"
|
||||
inject-files: 10
|
||||
|
||||
inject-files: 307
|
||||
rechunk-files: 601
|
||||
|
||||
file-upload-db-chunksize: 4194304 # 4MiB
|
||||
file-chunking-target-exponent: 21 # 2MiB
|
||||
file-chunking-hash-window: 4096
|
||||
|
||||
server-sessions:
|
||||
idle-timeout: 28807
|
||||
@ -229,6 +236,9 @@ token-buckets:
|
||||
depth: 1572864000 # 1500MiB
|
||||
inv-rate: 1.9e-6 # 2MiB/s
|
||||
initial-value: 0
|
||||
|
||||
rechunk-files:
|
||||
depth: 20971520 # 20MiB
|
||||
inv-rate: 9.5e-7 # 1MiB/s
|
||||
initial-value: 0
|
||||
|
||||
fallback-personalised-sheet-files-keys-expire: 2419200
|
||||
|
||||
@ -10,4 +10,5 @@ log-settings:
|
||||
auth-dummy-login: true
|
||||
server-session-acid-fallback: true
|
||||
|
||||
job-cron-interval: null
|
||||
job-workers: 1
|
||||
|
||||
@ -1,9 +1,20 @@
|
||||
FileContent
|
||||
FileContentEntry
|
||||
hash FileContentReference
|
||||
ix Natural
|
||||
chunkHash FileContentChunkId
|
||||
UniqueFileContentEntry hash ix
|
||||
|
||||
FileContentChunk
|
||||
hash FileContentChunkReference
|
||||
content ByteString
|
||||
unreferencedSince UTCTime Maybe
|
||||
contentBased Bool default=false -- For Migration
|
||||
Primary hash
|
||||
|
||||
FileContentChunkUnreferenced
|
||||
hash FileContentChunkId
|
||||
since UTCTime
|
||||
UniqueFileContentChunkUnreferenced hash
|
||||
|
||||
SessionFile
|
||||
content FileContentReference Maybe
|
||||
touched UTCTime
|
||||
@ -12,3 +23,8 @@ FileLock
|
||||
content FileContentReference
|
||||
instance InstanceId
|
||||
time UTCTime
|
||||
|
||||
FileChunkLock
|
||||
hash FileContentChunkReference
|
||||
instance InstanceId
|
||||
time UTCTime
|
||||
@ -151,6 +151,7 @@ dependencies:
|
||||
- minio-hs
|
||||
- network-ip
|
||||
- data-textual
|
||||
- fastcdc
|
||||
|
||||
other-extensions:
|
||||
- GeneralizedNewtypeDeriving
|
||||
|
||||
@ -227,13 +227,15 @@ makeFoundation appSettings'@AppSettings{..} = do
|
||||
forM_ ldapPool $ registerFailoverMetrics "ldap"
|
||||
|
||||
-- Perform database migration using our application's logging settings.
|
||||
if
|
||||
| appAutoDbMigrate -> do
|
||||
$logDebugS "setup" "Migration"
|
||||
migrateAll `runSqlPool` sqlPool
|
||||
| otherwise -> whenM (requiresMigration `runSqlPool` sqlPool) $ do
|
||||
$logErrorS "setup" "Migration required"
|
||||
liftIO . exitWith $ ExitFailure 2
|
||||
flip runReaderT tempFoundation $
|
||||
if
|
||||
| appAutoDbMigrate -> do
|
||||
$logDebugS "setup" "Migration"
|
||||
migrateAll `runSqlPool` sqlPool
|
||||
| otherwise -> whenM (requiresMigration `runSqlPool` sqlPool) $ do
|
||||
$logErrorS "setup" "Migration required"
|
||||
liftIO . exitWith $ ExitFailure 2
|
||||
|
||||
$logDebugS "setup" "Cluster-Config"
|
||||
appCryptoIDKey <- clusterSetting (Proxy :: Proxy 'ClusterCryptoIDKey) `runSqlPool` sqlPool
|
||||
appSecretBoxKey <- clusterSetting (Proxy :: Proxy 'ClusterSecretBoxKey) `runSqlPool` sqlPool
|
||||
|
||||
@ -18,6 +18,8 @@ import Data.Aeson as Aeson
|
||||
|
||||
import Control.Monad.Fail
|
||||
|
||||
import Language.Haskell.TH.Syntax (Lift(liftTyped))
|
||||
import Instances.TH.Lift ()
|
||||
|
||||
instance HashAlgorithm hash => PersistField (Digest hash) where
|
||||
toPersistValue = PersistByteString . convert
|
||||
@ -46,3 +48,6 @@ instance HashAlgorithm hash => FromJSON (Digest hash) where
|
||||
|
||||
instance Hashable (Digest hash) where
|
||||
hashWithSalt s = (hashWithSalt s :: ByteString -> Int) . convert
|
||||
|
||||
instance HashAlgorithm hash => Lift (Digest hash) where
|
||||
liftTyped dgst = [||fromMaybe (error "Lifted digest has wrong length") $ digestFromByteString $$(liftTyped (convert dgst :: ByteString))||]
|
||||
|
||||
@ -6,6 +6,7 @@ module Database.Esqueleto.Utils
|
||||
, justVal, justValList
|
||||
, isJust
|
||||
, isInfixOf, hasInfix
|
||||
, strConcat, substring
|
||||
, or, and
|
||||
, any, all
|
||||
, subSelectAnd, subSelectOr
|
||||
@ -39,7 +40,8 @@ import qualified Data.Set as Set
|
||||
import qualified Data.List as List
|
||||
import qualified Data.Foldable as F
|
||||
import qualified Database.Esqueleto as E
|
||||
import qualified Database.Esqueleto.Internal.Sql as E
|
||||
import qualified Database.Esqueleto.PostgreSQL as E
|
||||
import qualified Database.Esqueleto.Internal.Internal as E
|
||||
import Database.Esqueleto.Utils.TH
|
||||
|
||||
import qualified Data.Text.Lazy as Lazy (Text)
|
||||
@ -96,6 +98,42 @@ hasInfix :: ( E.SqlString s1
|
||||
=> E.SqlExpr (E.Value s2) -> E.SqlExpr (E.Value s1) -> E.SqlExpr (E.Value Bool)
|
||||
hasInfix = flip isInfixOf
|
||||
|
||||
infixl 6 `strConcat`
|
||||
|
||||
strConcat :: E.SqlString s
|
||||
=> E.SqlExpr (E.Value s) -> E.SqlExpr (E.Value s) -> E.SqlExpr (E.Value s)
|
||||
strConcat = E.unsafeSqlBinOp " || "
|
||||
|
||||
substring :: ( E.SqlString str
|
||||
, Num from, Num for
|
||||
)
|
||||
=> E.SqlExpr (E.Value str)
|
||||
-> E.SqlExpr (E.Value from)
|
||||
-> E.SqlExpr (E.Value for)
|
||||
-> E.SqlExpr (E.Value str)
|
||||
substring (E.ERaw p1 f1) (E.ERaw p2 f2) (E.ERaw p3 f3)
|
||||
= E.ERaw E.Never $ \info ->
|
||||
let (strTLB, strVals) = f1 info
|
||||
(fromiTLB, fromiVals) = f2 info
|
||||
(foriTLB, foriVals) = f3 info
|
||||
in ( "SUBSTRING" <> E.parens (E.parensM p1 strTLB <> " FROM " <> E.parensM p2 fromiTLB <> " FOR " <> E.parensM p3 foriTLB)
|
||||
, strVals <> fromiVals <> foriVals
|
||||
)
|
||||
substring a b c = substring (construct a) (construct b) (construct c)
|
||||
where construct :: E.SqlExpr (E.Value a) -> E.SqlExpr (E.Value a)
|
||||
construct (E.ERaw p f) = E.ERaw E.Parens $ \info ->
|
||||
let (b1, vals) = f info
|
||||
build ("?", [E.PersistList vals']) =
|
||||
(E.uncommas $ replicate (length vals') "?", vals')
|
||||
build expr = expr
|
||||
in build (E.parensM p b1, vals)
|
||||
construct (E.ECompositeKey f) =
|
||||
E.ERaw E.Parens $ \info -> (E.uncommas $ f info, mempty)
|
||||
construct (E.EAliasedValue i _) =
|
||||
E.ERaw E.Never $ E.aliasedValueIdentToRawSql i
|
||||
construct (E.EValueReference i i') =
|
||||
E.ERaw E.Never $ E.valueReferenceToRawSql i i'
|
||||
|
||||
and, or :: Foldable f => f (E.SqlExpr (E.Value Bool)) -> E.SqlExpr (E.Value Bool)
|
||||
and = F.foldr (E.&&.) true
|
||||
or = F.foldr (E.||.) false
|
||||
@ -111,8 +149,11 @@ all :: MonoFoldable f => (Element f -> E.SqlExpr (E.Value Bool)) -> f -> E.SqlEx
|
||||
all test = and . map test . otoList
|
||||
|
||||
subSelectAnd, subSelectOr :: E.SqlQuery (E.SqlExpr (E.Value Bool)) -> E.SqlExpr (E.Value Bool)
|
||||
subSelectAnd q = E.subSelectUnsafe $ E.unsafeSqlFunction "bool_and" <$> q
|
||||
subSelectOr q = E.subSelectUnsafe $ E.unsafeSqlFunction "bool_or" <$> q
|
||||
subSelectAnd q = parens . E.subSelectUnsafe $ flip (E.unsafeSqlAggregateFunction "bool_and" E.AggModeAll) [] <$> q
|
||||
subSelectOr q = parens . E.subSelectUnsafe $ flip (E.unsafeSqlAggregateFunction "bool_or" E.AggModeAll) [] <$> q
|
||||
|
||||
parens :: E.SqlExpr (E.Value a) -> E.SqlExpr (E.Value a)
|
||||
parens = E.unsafeSqlFunction ""
|
||||
|
||||
|
||||
-- Allow usage of Tuples as DbtRowKey, i.e. SqlIn instances for tuples
|
||||
|
||||
@ -7,7 +7,7 @@ module Foundation.Type
|
||||
, _SessionStorageMemcachedSql, _SessionStorageAcid
|
||||
, SMTPPool
|
||||
, _appSettings', _appStatic, _appConnPool, _appSmtpPool, _appLdapPool, _appWidgetMemcached, _appHttpManager, _appLogger, _appLogSettings, _appCryptoIDKey, _appClusterID, _appInstanceID, _appJobState, _appSessionStore, _appSecretBoxKey, _appJSONWebKeySet, _appHealthReport
|
||||
, DB, Form, MsgRenderer, MailM
|
||||
, DB, Form, MsgRenderer, MailM, DBFile
|
||||
) where
|
||||
|
||||
import Import.NoFoundation
|
||||
@ -81,3 +81,4 @@ type DB = YesodDB UniWorX
|
||||
type Form x = Html -> MForm (HandlerFor UniWorX) (FormResult x, WidgetFor UniWorX ())
|
||||
type MsgRenderer = MsgRendererS UniWorX -- see Utils
|
||||
type MailM a = MailT (HandlerFor UniWorX) a
|
||||
type DBFile = File (YesodDB UniWorX)
|
||||
|
||||
@ -80,12 +80,12 @@ testDownload = do
|
||||
sourceDBChunks :: ConduitT () Int DB ()
|
||||
sourceDBChunks = forever sourceDBFiles
|
||||
.| C.mapM (\x -> x <$ $logDebugS "testDownload.sourceDBChunks" (tshow $ entityKey x))
|
||||
.| C.map ((length $!!) . fileContentContent . entityVal)
|
||||
.| C.map ((length $!!) . fileContentChunkContent . entityVal)
|
||||
.| takeLimit dlMaxSize
|
||||
where
|
||||
sourceDBFiles = E.selectSource . E.from $ \fileContent -> do
|
||||
sourceDBFiles = E.selectSource . E.from $ \fileContentChunk -> do
|
||||
E.orderBy [E.asc $ E.random_ @Int64]
|
||||
return fileContent
|
||||
return fileContentChunk
|
||||
|
||||
takeLimit n | n <= 0 = return ()
|
||||
takeLimit n = do
|
||||
|
||||
@ -55,7 +55,7 @@ data PersonalisedSheetFileUnresolved a
|
||||
= PSFUnresolvedDirectory a
|
||||
| PSFUnresolvedCollatable Text a
|
||||
| PSFUnresolved a
|
||||
deriving (Eq, Ord, Read, Show, Generic, Typeable)
|
||||
deriving (Eq, Ord, Read, Show, Functor, Foldable, Traversable, Generic, Typeable)
|
||||
|
||||
makePrisms ''PersonalisedSheetFileUnresolved
|
||||
|
||||
@ -195,7 +195,7 @@ sourcePersonalisedSheetFiles :: forall m.
|
||||
-> Maybe SheetId
|
||||
-> Maybe (Set UserId)
|
||||
-> PersonalisedSheetFilesDownloadAnonymous
|
||||
-> ConduitT () (Either PersonalisedSheetFile File) (SqlPersistT m) ()
|
||||
-> ConduitT () (Either PersonalisedSheetFile DBFile) (SqlPersistT m) ()
|
||||
sourcePersonalisedSheetFiles cId mbsid mbuids anonMode = do
|
||||
(mbIdx, cIDKey) <- lift . newPersonalisedFilesKey $ maybe (Left cId) Right mbsid
|
||||
let
|
||||
@ -255,9 +255,10 @@ sourcePersonalisedSheetFiles cId mbsid mbuids anonMode = do
|
||||
, fileModified = courseParticipantRegistration
|
||||
}
|
||||
yieldM . fmap Right $ do
|
||||
fileContent <- lift $ Just . toStrict <$> formatPersonalisedSheetFilesMeta anonMode cPart cID
|
||||
fileContent' <- lift $ formatPersonalisedSheetFilesMeta anonMode cPart cID
|
||||
let fileTitle = (dirName <//>) . ensureExtension "yaml" . unpack . mr $ MsgPersonalisedSheetFilesMetaFilename cID
|
||||
fileModified = courseParticipantRegistration
|
||||
fileContent = Just $ C.sourceLazy fileContent'
|
||||
return File{..}
|
||||
_dirCache %= Set.insert dirName
|
||||
whenIsJust mbPFile $ \(Entity _ pFile@PersonalisedSheetFile{..}) -> do
|
||||
|
||||
@ -11,8 +11,6 @@ import Handler.Utils.Submission
|
||||
|
||||
import qualified Data.Set as Set
|
||||
|
||||
import qualified Data.Text.Encoding as Text
|
||||
|
||||
import qualified Database.Esqueleto as E
|
||||
|
||||
import qualified Data.Conduit.Combinators as Conduit
|
||||
@ -32,9 +30,8 @@ getSubDownloadR tid ssh csh shn cID (submissionFileTypeIsUpdate -> isUpdate) pat
|
||||
|
||||
case isRating of
|
||||
True
|
||||
| isUpdate -> runDB $ do
|
||||
file <- runMaybeT $ lift . ratingFile cID =<< MaybeT (getRating submissionID)
|
||||
maybe notFound (return . toTypedContent . Text.decodeUtf8) $ fileContent =<< file
|
||||
| isUpdate -> maybe notFound sendThisFile <=< runDB . runMaybeT $
|
||||
lift . ratingFile cID =<< MaybeT (getRating submissionID)
|
||||
| otherwise -> notFound
|
||||
False -> do
|
||||
let results = (.| Conduit.map entityVal) . E.selectSource . E.from $ \sf -> do
|
||||
|
||||
@ -34,11 +34,13 @@ import Control.Monad.Logger
|
||||
|
||||
|
||||
-- | Simply send a `File`-Value
|
||||
sendThisFile :: File -> Handler TypedContent
|
||||
sendThisFile :: DBFile -> Handler TypedContent
|
||||
sendThisFile File{..}
|
||||
| Just fileContent' <- fileContent = do
|
||||
setContentDisposition' . Just $ takeFileName fileTitle
|
||||
return $ TypedContent (simpleContentType (mimeLookup $ pack fileTitle) <> "; charset=utf-8") (toContent fileContent')
|
||||
let cType = simpleContentType (mimeLookup $ pack fileTitle) <> "; charset=utf-8"
|
||||
respondSourceDB cType $
|
||||
fileContent' .| Conduit.map toFlushBuilder
|
||||
| otherwise = sendResponseStatus noContent204 ()
|
||||
|
||||
-- | Serve a single file, identified through a given DB query
|
||||
@ -46,7 +48,7 @@ serveOneFile :: forall file. HasFileReference file => ConduitT () file (YesodDB
|
||||
serveOneFile source = do
|
||||
results <- runDB . runConduit $ source .| Conduit.take 2 -- We don't need more than two files to make a decision below
|
||||
case results of
|
||||
[file] -> sendThisFile =<< runDB (sourceFile' file)
|
||||
[file] -> sendThisFile $ sourceFile' file
|
||||
[] -> notFound
|
||||
_other -> do
|
||||
$logErrorS "SFileR" "Multiple matching files found."
|
||||
@ -58,7 +60,7 @@ serveOneFile source = do
|
||||
serveSomeFiles :: forall file. HasFileReference file => FilePath -> ConduitT () file (YesodDB UniWorX) () -> Handler TypedContent
|
||||
serveSomeFiles archiveName source = serveSomeFiles' archiveName $ source .| C.map Left
|
||||
|
||||
serveSomeFiles' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file File) (YesodDB UniWorX) () -> Handler TypedContent
|
||||
serveSomeFiles' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file DBFile) (YesodDB UniWorX) () -> Handler TypedContent
|
||||
serveSomeFiles' archiveName source = do
|
||||
(source', results) <- runDB $ runPeekN 2 source
|
||||
|
||||
@ -66,7 +68,7 @@ serveSomeFiles' archiveName source = do
|
||||
|
||||
case results of
|
||||
[] -> notFound
|
||||
[file] -> sendThisFile =<< either (runDB . sourceFile') return file
|
||||
[file] -> sendThisFile $ either sourceFile' id file
|
||||
_moreFiles -> do
|
||||
setContentDisposition' $ Just archiveName
|
||||
respondSourceDB typeZip $ do
|
||||
@ -79,7 +81,7 @@ serveSomeFiles' archiveName source = do
|
||||
serveZipArchive :: forall file. HasFileReference file => FilePath -> ConduitT () file (YesodDB UniWorX) () -> Handler TypedContent
|
||||
serveZipArchive archiveName source = serveZipArchive' archiveName $ source .| C.map Left
|
||||
|
||||
serveZipArchive' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file File) (YesodDB UniWorX) () -> Handler TypedContent
|
||||
serveZipArchive' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file DBFile) (YesodDB UniWorX) () -> Handler TypedContent
|
||||
serveZipArchive' archiveName source = do
|
||||
(source', results) <- runDB $ runPeekN 1 source
|
||||
|
||||
|
||||
@ -276,7 +276,7 @@ storeAllocationResult :: AllocationId
|
||||
-> (AllocationFingerprint, Set (UserId, CourseId), Seq MatchingLogRun)
|
||||
-> DB ()
|
||||
storeAllocationResult allocId now (allocFp, allocMatchings, ppMatchingLog -> allocLog) = do
|
||||
FileReference{..} <- sinkFile $ File "matchings.log" (Just $ encodeUtf8 allocLog) now
|
||||
FileReference{..} <- sinkFile $ File "matchings.log" (Just . yield $ encodeUtf8 allocLog) now
|
||||
insert_ . AllocationMatching allocId allocFp now $ fromMaybe (error "allocation result stored without fileReferenceContent") fileReferenceContent
|
||||
|
||||
doAllocation allocId now allocMatchings
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
module Handler.Utils.DateTime
|
||||
( utcToLocalTime, utcToZonedTime
|
||||
, localTimeToUTC, TZ.LocalToUTCResult(..)
|
||||
, localTimeToUTC, TZ.LocalToUTCResult(..), localTimeToUTCSimple
|
||||
, toTimeOfDay
|
||||
, toMidnight, beforeMidnight, toMidday, toMorning
|
||||
, formatDiffDays
|
||||
@ -47,6 +47,9 @@ utcToZonedTime = ZonedTime <$> TZ.utcToLocalTimeTZ appTZ <*> TZ.timeZoneForUTCTi
|
||||
localTimeToUTC :: LocalTime -> LocalToUTCResult
|
||||
localTimeToUTC = TZ.localTimeToUTCFull appTZ
|
||||
|
||||
localTimeToUTCSimple :: LocalTime -> UTCTime
|
||||
localTimeToUTCSimple = TZ.localTimeToUTCTZ appTZ
|
||||
|
||||
-- | Local midnight of given day
|
||||
toMidnight :: Day -> UTCTime
|
||||
toMidnight = toTimeOfDay 0 0 0
|
||||
|
||||
@ -2,17 +2,22 @@ module Handler.Utils.Files
|
||||
( sourceFile, sourceFile'
|
||||
, sourceFiles, sourceFiles'
|
||||
, SourceFilesException(..)
|
||||
, sourceFileDB
|
||||
, acceptFile
|
||||
) where
|
||||
|
||||
import Import
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import qualified Data.Conduit.List as C (unfoldM)
|
||||
|
||||
import Handler.Utils.Minio
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
import qualified Data.ByteString.Base64.URL as Base64
|
||||
import qualified Data.ByteArray as ByteArray
|
||||
import qualified Database.Esqueleto as E
|
||||
import qualified Database.Esqueleto.Utils as E
|
||||
|
||||
import System.FilePath (normalise)
|
||||
|
||||
|
||||
data SourceFilesException
|
||||
@ -22,36 +27,72 @@ data SourceFilesException
|
||||
deriving anyclass (Exception)
|
||||
|
||||
|
||||
sourceFiles :: ConduitT FileReference File (YesodDB UniWorX) ()
|
||||
sourceFiles = C.mapM sourceFile
|
||||
sourceFileDB :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX)
|
||||
=> FileContentReference -> ConduitT () ByteString (SqlPersistT m) ()
|
||||
sourceFileDB fileReference = do
|
||||
dbChunksize <- getsYesod $ view _appFileUploadDBChunksize
|
||||
let retrieveChunk chunkHash = \case
|
||||
Nothing -> return Nothing
|
||||
Just start -> do
|
||||
chunk <- E.selectMaybe . E.from $ \fileContentChunk -> do
|
||||
E.where_ $ fileContentChunk E.^. FileContentChunkId E.==. E.val chunkHash
|
||||
return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize)
|
||||
case chunk of
|
||||
Nothing -> throwM SourceFilesContentUnavailable
|
||||
Just (E.Value c) -> return . Just . (c, ) $ if
|
||||
| olength c >= dbChunksize -> Just $ start + dbChunksize
|
||||
| otherwise -> Nothing
|
||||
chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference
|
||||
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ]
|
||||
return $ fileContentEntry E.^. FileContentEntryChunkHash
|
||||
chunkHashes .| C.map E.unValue .| awaitForever (\chunkHash -> C.unfoldM (retrieveChunk chunkHash) $ Just (1 :: Int))
|
||||
|
||||
sourceFile :: FileReference -> DB File
|
||||
sourceFile FileReference{..} = do
|
||||
mFileContent <- traverse get $ FileContentKey <$> fileReferenceContent
|
||||
fileContent <- if
|
||||
| is (_Just . _Nothing) mFileContent
|
||||
, Just fileContentHash <- fileReferenceContent -- Not a restriction
|
||||
-> maybeT (throwM SourceFilesContentUnavailable) $ do
|
||||
let uploadName = decodeUtf8 . Base64.encodeUnpadded $ ByteArray.convert fileContentHash
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
fmap Just . hoistMaybe <=< runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| C.fold
|
||||
| fmap (fmap fileContentHash) mFileContent /= fmap Just fileReferenceContent
|
||||
-> throwM SourceFilesMismatchedHashes
|
||||
| Just fileContent' <- fileContentContent <$> join mFileContent
|
||||
-> return $ Just fileContent'
|
||||
| otherwise
|
||||
-> return Nothing
|
||||
|
||||
return File
|
||||
{ fileTitle = fileReferenceTitle
|
||||
, fileContent
|
||||
, fileModified = fileReferenceModified
|
||||
}
|
||||
sourceFiles :: Monad m => ConduitT FileReference DBFile m ()
|
||||
sourceFiles = C.map sourceFile
|
||||
|
||||
sourceFiles' :: forall file. HasFileReference file => ConduitT file File (YesodDB UniWorX) ()
|
||||
sourceFiles' = C.mapM sourceFile'
|
||||
sourceFile :: FileReference -> DBFile
|
||||
sourceFile FileReference{..} = File
|
||||
{ fileTitle = fileReferenceTitle
|
||||
, fileModified = fileReferenceModified
|
||||
, fileContent = toFileContent <$> fileReferenceContent
|
||||
}
|
||||
where
|
||||
toFileContent fileReference
|
||||
| fileReference == $$(liftTyped $ FileContentReference $$(emptyHash))
|
||||
= return ()
|
||||
toFileContent fileReference = do
|
||||
inDB <- lift . E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference
|
||||
if
|
||||
| inDB -> sourceFileDB fileReference
|
||||
| otherwise -> do
|
||||
chunkVar <- newEmptyTMVarIO
|
||||
minioAsync <- lift . allocateLinkedAsync $
|
||||
maybeT (throwM SourceFilesContentUnavailable) $ do
|
||||
let uploadName = minioFileReference # fileReference
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
hoistMaybe <=< runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar . Just)
|
||||
atomically $ putTMVar chunkVar Nothing
|
||||
let go = do
|
||||
mChunk <- atomically $ readTMVar chunkVar
|
||||
case mChunk of
|
||||
Nothing -> waitAsync minioAsync
|
||||
Just chunk -> yield chunk >> go
|
||||
in go
|
||||
|
||||
sourceFile' :: forall file. HasFileReference file => file -> DB File
|
||||
sourceFiles' :: forall file m. (HasFileReference file, Monad m) => ConduitT file DBFile m ()
|
||||
sourceFiles' = C.map sourceFile'
|
||||
|
||||
sourceFile' :: forall file. HasFileReference file => file -> DBFile
|
||||
sourceFile' = sourceFile . view (_FileReference . _1)
|
||||
|
||||
|
||||
acceptFile :: (MonadResource m, MonadResource m') => FileInfo -> m (File m')
|
||||
acceptFile fInfo = do
|
||||
let fileTitle = dropWhile isPathSeparator . dropTrailingPathSeparator . normalise . unpack $ fileName fInfo
|
||||
fileContent = Just $ fileSource fInfo
|
||||
fileModified <- liftIO getCurrentTime
|
||||
return File{..}
|
||||
|
||||
@ -32,7 +32,7 @@ import Yesod.Form.Bootstrap3
|
||||
|
||||
import Handler.Utils.Zip
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import qualified Data.Conduit.List as C (mapMaybe)
|
||||
import qualified Data.Conduit.List as C (mapMaybe, mapMaybeM)
|
||||
|
||||
import qualified Database.Esqueleto as E
|
||||
import qualified Database.Esqueleto.Utils as E
|
||||
@ -831,7 +831,10 @@ pseudonymWordField = checkMMap doCheck id $ ciField & addDatalist (return $ mkOp
|
||||
|
||||
|
||||
uploadContents :: (MonadHandler m, HandlerSite m ~ UniWorX) => ConduitT FileReference ByteString m ()
|
||||
uploadContents = transPipe (liftHandler . runDB) sourceFiles .| C.mapMaybe fileContent
|
||||
uploadContents = transPipe (liftHandler . runDB) sourceFiles .| C.mapMaybeM fileContent'
|
||||
where fileContent' f = runMaybeT $ do
|
||||
File{fileContent = Just fc} <- return f
|
||||
liftHandler . runDB . runConduit $ fc .| C.fold
|
||||
|
||||
data FileFieldUserOption a = FileFieldUserOption
|
||||
{ fieldOptionForce :: Bool
|
||||
@ -893,11 +896,21 @@ genericFileField mkOpts = Field{..}
|
||||
, Map.filter (views _3 $ (&&) <$> fieldOptionForce <*> fieldOptionDefault) fieldAdditionalFiles
|
||||
]
|
||||
|
||||
handleUpload :: FileField -> Maybe Text -> ConduitT File FileReference (YesodDB UniWorX) ()
|
||||
handleUpload :: FileField -> Maybe Text -> ConduitT (File Handler) FileReference (YesodDB UniWorX) ()
|
||||
handleUpload FileField{fieldMaxFileSize} mIdent
|
||||
= C.filter (\File{..} -> maybe (const True) (>) fieldMaxFileSize $ maybe 0 (fromIntegral . olength) fileContent)
|
||||
.| sinkFiles
|
||||
.| C.mapM mkSessionFile
|
||||
= C.map (transFile liftHandler)
|
||||
.| C.mapMaybeM (\f@File{..} -> maybeT (return $ Just f) $ do
|
||||
maxSize <- fromIntegral <$> hoistMaybe fieldMaxFileSize
|
||||
fc <- hoistMaybe fileContent
|
||||
let peekNE n = do
|
||||
str <- C.takeE n .| C.fold
|
||||
leftover str
|
||||
yield str
|
||||
(unsealConduitT -> fc', size) <- lift $ fc $$+ peekNE (succ maxSize) .| C.lengthE
|
||||
return . guardOn (size <= maxSize) $ f { fileContent = Just fc' }
|
||||
)
|
||||
.| sinkFiles
|
||||
.| C.mapM mkSessionFile
|
||||
where
|
||||
mkSessionFile fRef@FileReference{..} = fRef <$ do
|
||||
now <- liftIO getCurrentTime
|
||||
@ -924,7 +937,7 @@ genericFileField mkOpts = Field{..}
|
||||
doUnpack
|
||||
| fieldOptionForce fieldUnpackZips = fieldOptionDefault fieldUnpackZips
|
||||
| otherwise = unpackZips `elem` vals
|
||||
handleFile :: FileInfo -> ConduitT () File Handler ()
|
||||
handleFile :: FileInfo -> ConduitT () (File Handler) Handler ()
|
||||
handleFile
|
||||
| doUnpack = receiveFiles
|
||||
| otherwise = yieldM . acceptFile
|
||||
|
||||
@ -12,9 +12,7 @@ import Handler.Utils.Files
|
||||
|
||||
import qualified Data.CaseInsensitive as CI
|
||||
|
||||
import qualified Data.ByteString.Lazy as LBS
|
||||
|
||||
import qualified Data.Conduit.List as C
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
import qualified Text.Pandoc as P
|
||||
|
||||
@ -72,12 +70,13 @@ addFileDB :: ( MonadMail m
|
||||
, HandlerSite m ~ UniWorX
|
||||
) => FileReference -> m (Maybe MailObjectId)
|
||||
addFileDB fRef = runMaybeT $ do
|
||||
File{fileTitle = pack . takeBaseName -> fileName, fileContent = Just fileContent} <- lift . liftHandler . runDB $ sourceFile fRef
|
||||
File{fileTitle = pack . takeBaseName -> fileName, fileContent = Just fileContent'} <- return $ sourceFile fRef
|
||||
fileContent <- liftHandler . runDB . runConduit $ fileContent' .| C.sinkLazy
|
||||
lift . addPart $ do
|
||||
_partType .= decodeUtf8 (mimeLookup fileName)
|
||||
_partEncoding .= Base64
|
||||
_partDisposition .= AttachmentDisposition fileName
|
||||
_partContent .= PartContent (LBS.fromStrict fileContent)
|
||||
_partContent .= PartContent fileContent
|
||||
setMailObjectIdPseudorandom (fileName, fileContent) :: StateT Part (HandlerFor UniWorX) MailObjectId
|
||||
|
||||
|
||||
|
||||
@ -16,11 +16,9 @@ import Handler.Utils.DateTime (getDateTimeFormatter)
|
||||
|
||||
import qualified Data.Text as Text
|
||||
|
||||
import qualified Data.ByteString.Lazy as Lazy.ByteString
|
||||
|
||||
import qualified Database.Esqueleto as E
|
||||
|
||||
import qualified Data.Conduit.List as Conduit
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
import Handler.Utils.Rating.Format
|
||||
|
||||
@ -91,15 +89,16 @@ extensionRating = "txt"
|
||||
|
||||
ratingFile :: ( MonadHandler m
|
||||
, HandlerSite m ~ UniWorX
|
||||
, Monad m'
|
||||
)
|
||||
=> CryptoFileNameSubmission -> Rating -> m File
|
||||
=> CryptoFileNameSubmission -> Rating -> m (File m')
|
||||
ratingFile cID rating@Rating{ ratingValues = Rating'{..} } = do
|
||||
mr'@(MsgRenderer mr) <- getMsgRenderer
|
||||
dtFmt <- getDateTimeFormatter
|
||||
fileModified <- maybe (liftIO getCurrentTime) return ratingTime
|
||||
let
|
||||
fileTitle = ensureExtension extensionRating . unpack . mr $ MsgRatingFileTitle cID
|
||||
fileContent = Just . Lazy.ByteString.toStrict $ formatRating mr' dtFmt cID rating
|
||||
fileContent = Just . C.sourceLazy $ formatRating mr' dtFmt cID rating
|
||||
return File{..}
|
||||
|
||||
type SubmissionContent = Either FileReference (SubmissionId, Rating')
|
||||
@ -107,13 +106,12 @@ type SubmissionContent = Either FileReference (SubmissionId, Rating')
|
||||
extractRatings :: ( MonadHandler m
|
||||
, HandlerSite m ~ UniWorX
|
||||
) => ConduitT FileReference SubmissionContent m ()
|
||||
extractRatings = Conduit.mapM $ \fRef@FileReference{..} -> liftHandler $ do
|
||||
extractRatings = C.mapM $ \fRef@FileReference{..} -> liftHandler $ do
|
||||
msId <- isRatingFile fileReferenceTitle
|
||||
if
|
||||
| Just sId <- msId
|
||||
, isJust fileReferenceContent -> do
|
||||
f <- runDB $ sourceFile fRef
|
||||
(rating, cID) <- handle (throwM . RatingFileException fileReferenceTitle) $ parseRating f
|
||||
(rating, cID) <- handle (throwM . RatingFileException fileReferenceTitle) . runDB . parseRating $ sourceFile fRef
|
||||
sId' <- traverse decrypt cID
|
||||
unless (maybe (const True) (==) sId' sId) $
|
||||
throwM $ RatingFileException fileReferenceTitle RatingSubmissionIDIncorrect
|
||||
|
||||
@ -35,6 +35,8 @@ import qualified System.FilePath.Cryptographic as Explicit
|
||||
|
||||
import Control.Exception (ErrorCall(..))
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
|
||||
data PrettifyState
|
||||
= PrettifyInitial
|
||||
@ -195,8 +197,9 @@ instance ns ~ CryptoIDNamespace (CI FilePath) SubmissionId => YAML.FromYAML (May
|
||||
)
|
||||
|
||||
|
||||
parseRating :: MonadCatch m => File -> m (Rating', Maybe CryptoFileNameSubmission)
|
||||
parseRating f@File{ fileContent = Just (fromStrict -> input), .. } = handle onFailure . handle (throwM . RatingParseException) . handleIf isYAMLUnicodeError (\(ErrorCall msg) -> throwM $ RatingYAMLNotUnicode msg) $ do
|
||||
parseRating :: MonadCatch m => File m -> m (Rating', Maybe CryptoFileNameSubmission)
|
||||
parseRating f@File{ fileContent = Just input', .. } = handle onFailure . handle (throwM . RatingParseException) . handleIf isYAMLUnicodeError (\(ErrorCall msg) -> throwM $ RatingYAMLNotUnicode msg) $ do
|
||||
input <- runConduit $ input' .| C.sinkLazy
|
||||
let evStream = YAML.Event.parseEvents input
|
||||
delimitDocument = do
|
||||
ev <- maybe (throwM RatingYAMLStreamTerminatedUnexpectedly) return =<< await
|
||||
|
||||
@ -16,6 +16,8 @@ import qualified Data.ByteString.Lazy as Lazy (ByteString)
|
||||
|
||||
import qualified Data.CaseInsensitive as CI
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
|
||||
import Text.Read (readEither)
|
||||
|
||||
@ -55,9 +57,9 @@ formatRating cID Rating{ ratingValues = Rating'{..}, ..} = let
|
||||
]
|
||||
in Lazy.Text.encodeUtf8 . (<> "\n") $ displayT doc
|
||||
|
||||
parseRating :: MonadCatch m => File -> m Rating'
|
||||
parseRating :: MonadCatch m => File m -> m Rating'
|
||||
parseRating File{ fileContent = Just input, .. } = handle (throwM . RatingParseLegacyException) $ do
|
||||
inputText <- either (throwM . RatingNotUnicode) return $ Text.decodeUtf8' input
|
||||
inputText <- either (throwM . RatingNotUnicode) return . Text.decodeUtf8' =<< runConduit (input .| C.fold)
|
||||
let
|
||||
(headerLines', commentLines) = break (commentSep `Text.isInfixOf`) $ Text.lines inputText
|
||||
(reverse -> ratingLines, reverse -> _headerLines) = break (sep' `Text.isInfixOf`) $ reverse headerLines'
|
||||
|
||||
@ -256,7 +256,7 @@ planSubmissions sid restriction = do
|
||||
maximumsBy f xs = flip Set.filter xs $ \x -> maybe True (((==) `on` f) x . maximumBy (comparing f)) $ fromNullable xs
|
||||
|
||||
|
||||
submissionFileSource :: SubmissionId -> ConduitT () File (YesodDB UniWorX) ()
|
||||
submissionFileSource :: SubmissionId -> ConduitT () DBFile (YesodDB UniWorX) ()
|
||||
submissionFileSource subId = E.selectSource (E.from $ submissionFileQuery subId)
|
||||
.| C.map entityVal
|
||||
.| sourceFiles'
|
||||
@ -319,7 +319,7 @@ submissionMultiArchive anonymous (Set.toList -> ids) = do
|
||||
setContentDisposition' $ Just ((addExtension `on` unpack) (mr archiveName) extensionZip)
|
||||
respondSource typeZip . (<* lift cleanup) . transPipe (runDBRunner dbrunner) $ do
|
||||
let
|
||||
fileEntitySource' :: (Rating, Entity Submission, Maybe UTCTime, (SheetName,CourseShorthand,SchoolId,TermId,Bool)) -> ConduitT () File (YesodDB UniWorX) ()
|
||||
fileEntitySource' :: (Rating, Entity Submission, Maybe UTCTime, (SheetName,CourseShorthand,SchoolId,TermId,Bool)) -> ConduitT () DBFile (YesodDB UniWorX) ()
|
||||
fileEntitySource' (rating, Entity submissionID Submission{}, subTime, (shn,csh,ssh,tid,sheetAnonymous)) = do
|
||||
cID <- encrypt submissionID
|
||||
|
||||
|
||||
@ -13,17 +13,14 @@ module Handler.Utils.Zip
|
||||
|
||||
import Import
|
||||
|
||||
import Handler.Utils.Files (acceptFile)
|
||||
import Handler.Utils.DateTime (localTimeToUTCSimple, utcToLocalTime)
|
||||
|
||||
import Codec.Archive.Zip.Conduit.Types
|
||||
import Codec.Archive.Zip.Conduit.UnZip
|
||||
import Codec.Archive.Zip.Conduit.Zip
|
||||
|
||||
-- import qualified Data.ByteString.Lazy as Lazy (ByteString)
|
||||
import qualified Data.ByteString.Lazy as Lazy.ByteString
|
||||
|
||||
import qualified Data.ByteString as ByteString
|
||||
|
||||
import System.FilePath
|
||||
import Data.Time.LocalTime (localTimeToUTC, utcToLocalTime)
|
||||
|
||||
import Data.List (dropWhileEnd)
|
||||
|
||||
@ -38,6 +35,10 @@ import Data.Encoding ( decodeStrictByteStringExplicit
|
||||
import Data.Encoding.CP437
|
||||
import qualified Data.Char as Char
|
||||
|
||||
import Control.Monad.Trans.Cont
|
||||
import Control.Monad.Trans.State.Strict (evalStateT)
|
||||
import qualified Control.Monad.State.Class as State
|
||||
|
||||
|
||||
typeZip :: ContentType
|
||||
typeZip = "application/zip"
|
||||
@ -53,94 +54,157 @@ instance Default ZipInfo where
|
||||
}
|
||||
|
||||
|
||||
consumeZip :: forall b m.
|
||||
( MonadThrow b
|
||||
, MonadThrow m
|
||||
, MonadBase b m
|
||||
, PrimMonad b
|
||||
)
|
||||
=> ConduitT ByteString File m ZipInfo
|
||||
consumeZip = transPipe liftBase unZipStream `fuseUpstream` consumeZip'
|
||||
where
|
||||
consumeZip' :: ConduitT (Either ZipEntry ByteString) File m ()
|
||||
consumeZip' = do
|
||||
input <- await
|
||||
case input of
|
||||
Nothing -> return ()
|
||||
Just (Right _) -> throwM $ userError "Data chunk in unexpected place when parsing ZIP"
|
||||
Just (Left ZipEntry{..}) -> do
|
||||
contentChunks <- toConsumer accContents
|
||||
zipEntryName' <- decodeZipEntryName zipEntryName
|
||||
let
|
||||
fileTitle = dropWhile isPathSeparator . dropTrailingPathSeparator . normalise $ makeValid zipEntryName'
|
||||
fileModified = localTimeToUTC utc zipEntryTime
|
||||
fileContent
|
||||
| hasTrailingPathSeparator zipEntryName' = Nothing
|
||||
| otherwise = Just $ mconcat contentChunks
|
||||
yield File{..}
|
||||
consumeZip'
|
||||
accContents :: ConduitT (Either a b') Void m [b']
|
||||
accContents = do
|
||||
input <- await
|
||||
case input of
|
||||
Just (Right x) -> (x :) <$> accContents
|
||||
Just (Left x) -> [] <$ leftover (Left x)
|
||||
_ -> return []
|
||||
data ConsumeZipException
|
||||
= ConsumeZipUnZipException SomeException
|
||||
| ConsumeZipUnexpectedContent
|
||||
deriving (Show, Generic, Typeable)
|
||||
deriving anyclass (Exception)
|
||||
|
||||
produceZip :: forall b m.
|
||||
( MonadThrow b
|
||||
, MonadThrow m
|
||||
, MonadBase b m
|
||||
, PrimMonad b
|
||||
|
||||
consumeZip :: forall m m'.
|
||||
( MonadThrow m
|
||||
, PrimMonad m
|
||||
, MonadUnliftIO m
|
||||
, MonadResource m
|
||||
, MonadIO m'
|
||||
, MonadThrow m'
|
||||
)
|
||||
=> ConduitT () ByteString m () -> ConduitT () (File m') m ZipInfo
|
||||
consumeZip inpBS = do
|
||||
inpChunk <- liftIO newEmptyTMVarIO
|
||||
zipAsync <- lift . allocateLinkedAsync $
|
||||
runConduit $ (inpBS .| unZipStream) `fuseUpstream` C.mapM_ (atomically . putTMVar inpChunk)
|
||||
|
||||
flip evalStateT Nothing . evalContT . callCC $ \finishConsume -> forever $ do
|
||||
inpChunk' <- atomically $
|
||||
Right <$> takeTMVar inpChunk
|
||||
<|> Left <$> waitCatchSTM zipAsync
|
||||
|
||||
fileSink <- State.get
|
||||
case (fileSink, inpChunk') of
|
||||
(mFSink , Left (Left unzipExc) ) -> do
|
||||
for_ mFSink $ \fSink' -> atomically $ do
|
||||
writeTMChan fSink' $ Left unzipExc
|
||||
closeTMChan fSink'
|
||||
throwM unzipExc
|
||||
|
||||
(mFSink , Left (Right zInfo) ) -> do
|
||||
for_ mFSink $ atomically . closeTMChan
|
||||
finishConsume zInfo
|
||||
|
||||
(Just fSink, Right (Right bs) ) ->
|
||||
atomically . writeTMChan fSink $ Right bs
|
||||
|
||||
(Nothing , Right (Right _) ) ->
|
||||
throwM ConsumeZipUnexpectedContent
|
||||
|
||||
(mFSink , Right (Left ZipEntry{..})) -> do
|
||||
for_ mFSink $ atomically . closeTMChan
|
||||
State.put Nothing
|
||||
|
||||
zipEntryName' <- decodeZipEntryName zipEntryName
|
||||
let
|
||||
fileTitle = "." <//> zipEntryName'
|
||||
& normalise
|
||||
& makeValid
|
||||
& dropWhile isPathSeparator
|
||||
& dropWhileEnd isPathSeparator
|
||||
& normalise
|
||||
& makeValid
|
||||
fileModified = localTimeToUTCSimple zipEntryTime
|
||||
isDirectory = hasTrailingPathSeparator zipEntryName'
|
||||
fileContent <- if
|
||||
| isDirectory -> return Nothing
|
||||
| otherwise -> do
|
||||
fileChan <- liftIO newTMChanIO
|
||||
State.put $ Just fileChan
|
||||
return . Just . evalContT . callCC $ \finishFileContent -> forever $ do
|
||||
nextVal <- atomically $ asum
|
||||
[ readTMChan fileChan
|
||||
, do
|
||||
inpChunk'' <- Right <$> takeTMVar inpChunk
|
||||
<|> Left <$> waitCatchSTM zipAsync
|
||||
case inpChunk'' of
|
||||
Left (Left unzipExc) -> return . Just $ Left unzipExc
|
||||
Left (Right _ ) -> return Nothing
|
||||
Right (Left zInfo ) -> Nothing <$ putTMVar inpChunk (Left zInfo)
|
||||
Right (Right bs ) -> return . Just $ Right bs
|
||||
]
|
||||
case nextVal of
|
||||
Nothing -> finishFileContent ()
|
||||
Just (Right bs) -> lift $ yield bs
|
||||
Just (Left exc) -> throwM $ ConsumeZipUnZipException exc
|
||||
lift . lift $ yield File{..}
|
||||
|
||||
produceZip :: forall m.
|
||||
( MonadThrow m
|
||||
, PrimMonad m
|
||||
)
|
||||
=> ZipInfo
|
||||
-> ConduitT File ByteString m ()
|
||||
produceZip info = C.map toZipData .| transPipe liftBase (void $ zipStream zipOptions)
|
||||
-> ConduitT (File m) ByteString m ()
|
||||
produceZip info = C.map toZipData .| void (zipStream zipOptions)
|
||||
where
|
||||
zipOptions = ZipOptions
|
||||
{ zipOpt64 = True
|
||||
, zipOptCompressLevel = -1 -- This is passed through all the way to the C zlib, where it means "default level"
|
||||
{ zipOpt64 = False
|
||||
, zipOptCompressLevel = defaultCompression
|
||||
, zipOptInfo = info
|
||||
}
|
||||
|
||||
toZipData :: File -> (ZipEntry, ZipData b)
|
||||
toZipData f@File{..} =
|
||||
let zData = maybe mempty (ZipDataByteString . Lazy.ByteString.fromStrict) fileContent
|
||||
zEntry = (toZipEntry f){ zipEntrySize = fromIntegral . ByteString.length <$> fileContent }
|
||||
in (zEntry, zData)
|
||||
-- toZipData :: forall v. File m -> ConduitT v (ZipEntry, ZipData m) m ()
|
||||
-- toZipData f
|
||||
-- | Just fc <- fileContent f = do
|
||||
-- outpChunk <- newEmptyTMVarIO
|
||||
-- outpAsync <- lift . allocateLinkedAsync $
|
||||
-- runConduit $ fc .| C.mapM_ (atomically . putTMVar outpChunk)
|
||||
-- yield ( toZipEntry f
|
||||
-- , ZipDataSource . evalContT . callCC $ \finishContent -> forever $ do
|
||||
-- nextVal <- atomically $
|
||||
-- Right <$> takeTMVar outpChunk
|
||||
-- <|> Left <$> waitCatchSTM outpAsync
|
||||
-- case nextVal of
|
||||
-- Right chunk -> lift $ yield chunk
|
||||
-- Left (Right () ) -> finishContent ()
|
||||
-- Left (Left exc) -> throwM exc
|
||||
-- )
|
||||
-- | otherwise = yield (toZipEntry f, mempty)
|
||||
|
||||
toZipEntry :: File -> ZipEntry
|
||||
toZipData :: File m -> (ZipEntry, ZipData m)
|
||||
toZipData f@File{..}
|
||||
= (toZipEntry f, maybe mempty ZipDataSource fileContent)
|
||||
|
||||
toZipEntry :: File m -> ZipEntry
|
||||
toZipEntry File{..} = ZipEntry{..}
|
||||
where
|
||||
isDir = isNothing fileContent
|
||||
isDir = is _Nothing fileContent
|
||||
|
||||
zipEntryName = encodeZipEntryName . bool (dropWhileEnd isPathSeparator) addTrailingPathSeparator isDir . normalise $ makeValid fileTitle
|
||||
zipEntryTime = utcToLocalTime utc fileModified
|
||||
zipEntryName = "." <//> fileTitle
|
||||
& normalise
|
||||
& makeValid
|
||||
& dropWhile isPathSeparator
|
||||
& dropWhileEnd isPathSeparator
|
||||
& bool id addTrailingPathSeparator isDir
|
||||
& normalise
|
||||
& makeValid
|
||||
& encodeZipEntryName
|
||||
zipEntryTime = utcToLocalTime fileModified
|
||||
zipEntrySize = Nothing
|
||||
zipEntryExternalAttributes = Nothing
|
||||
|
||||
modifyFileTitle :: Monad m => (FilePath -> FilePath) -> ConduitT File File m ()
|
||||
modifyFileTitle :: Monad m => (FilePath -> FilePath) -> ConduitT (File m') (File m') m ()
|
||||
modifyFileTitle f = mapC $ \x@File{..} -> x{ fileTitle = f fileTitle }
|
||||
|
||||
-- Takes FileInfo and if it is a ZIP-Archive, extract files, otherwiese yield fileinfo
|
||||
receiveFiles :: (MonadLogger m, MonadResource m, MonadThrow m, MonadBase IO m) => FileInfo -> ConduitT () File m ()
|
||||
receiveFiles :: (MonadLogger m, MonadResource m, MonadThrow m, PrimMonad m, MonadUnliftIO m, MonadResource m', MonadThrow m') => FileInfo -> ConduitT () (File m') m ()
|
||||
receiveFiles fInfo
|
||||
| ((==) `on` simpleContentType) mimeType typeZip = do
|
||||
$logInfoS "sourceFiles" "Unpacking ZIP"
|
||||
fileSource fInfo .| void consumeZip
|
||||
void . consumeZip $ fileSource fInfo
|
||||
| otherwise = do
|
||||
$logDebugS "sourceFiles" [st|Not unpacking file of type #{decodeUtf8 mimeType}|]
|
||||
yieldM $ acceptFile fInfo
|
||||
where
|
||||
mimeType = mimeLookup $ fileName fInfo
|
||||
|
||||
acceptFile :: MonadResource m => FileInfo -> m File
|
||||
acceptFile fInfo = do
|
||||
let fileTitle = dropWhile isPathSeparator . dropTrailingPathSeparator . normalise . unpack $ fileName fInfo
|
||||
fileModified <- liftIO getCurrentTime
|
||||
fileContent <- fmap Just . runConduit $ fileSource fInfo .| foldC
|
||||
return File{..}
|
||||
|
||||
|
||||
decodeZipEntryName :: MonadThrow m => Either Text ByteString -> m FilePath
|
||||
-- ^ Extract the filename from a 'ZipEntry' doing decoding along the way.
|
||||
|
||||
@ -4,6 +4,7 @@ module Import
|
||||
|
||||
import Foundation as Import
|
||||
import Import.NoFoundation as Import
|
||||
import Model.Migration as Import
|
||||
|
||||
import Utils.SystemMessage as Import
|
||||
import Utils.Metrics as Import
|
||||
|
||||
@ -4,7 +4,6 @@ module Import.NoFoundation
|
||||
|
||||
import Import.NoModel as Import
|
||||
import Model as Import
|
||||
import Model.Migration as Import
|
||||
import Model.Rating as Import
|
||||
import Model.Submission as Import
|
||||
import Model.Tokens as Import
|
||||
|
||||
@ -128,6 +128,8 @@ import Data.Proxy as Import (Proxy(..))
|
||||
|
||||
import Data.List.PointedList as Import (PointedList)
|
||||
|
||||
import Language.Haskell.TH.Syntax as Import (Lift(liftTyped))
|
||||
|
||||
import Language.Haskell.TH.Instances as Import ()
|
||||
import Data.NonNull.Instances as Import ()
|
||||
import Data.Monoid.Instances as Import ()
|
||||
|
||||
51
src/Jobs.hs
51
src/Jobs.hs
@ -5,7 +5,7 @@ module Jobs
|
||||
, stopJobCtl
|
||||
) where
|
||||
|
||||
import Import
|
||||
import Import hiding (StateT)
|
||||
import Jobs.Types as Types hiding (JobCtl(JobCtlQueue))
|
||||
import Jobs.Queue
|
||||
import Jobs.Crontab
|
||||
@ -30,6 +30,7 @@ import qualified Data.Map.Strict as Map
|
||||
import Data.Map.Strict ((!))
|
||||
|
||||
import Control.Monad.Trans.RWS.Lazy (RWST, mapRWST, evalRWST)
|
||||
import Control.Monad.Trans.State.Strict (StateT, evalStateT)
|
||||
import qualified Control.Monad.State.Class as State
|
||||
import Control.Monad.Writer.Class (MonadWriter(..))
|
||||
import Control.Monad.Trans.Cont (ContT(..), callCC)
|
||||
@ -99,6 +100,7 @@ handleJobs foundation@UniWorX{..}
|
||||
jobConfirm <- liftIO $ newTVarIO HashMap.empty
|
||||
jobShutdown <- liftIO newEmptyTMVarIO
|
||||
jobCurrentCrontab <- liftIO $ newTVarIO Nothing
|
||||
jobHeldLocks <- liftIO $ newTVarIO Set.empty
|
||||
atomically $ putTMVar appJobState JobState
|
||||
{ jobContext = JobContext{..}
|
||||
, ..
|
||||
@ -114,8 +116,9 @@ manageCrontab foundation@UniWorX{..} unmask = do
|
||||
jState <- atomically $ readTMVar appJobState
|
||||
liftIO . unsafeHandler foundation . void $ do
|
||||
atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState
|
||||
runReaderT ?? foundation $
|
||||
writeJobCtlBlock JobCtlDetermineCrontab
|
||||
when (has (_appJobCronInterval . _Just) foundation) $
|
||||
runReaderT ?? foundation $
|
||||
writeJobCtlBlock JobCtlDetermineCrontab
|
||||
void $ evalRWST (forever execCrontab) jState HashMap.empty
|
||||
|
||||
let awaitTermination = guardM $
|
||||
@ -414,13 +417,14 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do
|
||||
handleCmd JobCtlTest = return ()
|
||||
handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod)
|
||||
handleCmd (JobCtlQueue job) = lift $ queueJob' job
|
||||
handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
|
||||
handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do
|
||||
content <- case fromJSON queuedJobContent of
|
||||
Aeson.Success c -> return c
|
||||
Aeson.Error t -> do
|
||||
$logErrorS logIdent $ "Aeson decoding error: " <> pack t
|
||||
throwM $ JInvalid jId j
|
||||
|
||||
$logInfoS logIdent $ tshow content
|
||||
$logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content
|
||||
|
||||
instanceID' <- getsYesod $ view instanceID
|
||||
@ -466,40 +470,45 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do
|
||||
. Set.filter (((/=) `on` classifyHealthReport) newReport . snd)
|
||||
atomically . modifyTVar' hrStorage $ force . updateReports
|
||||
|
||||
jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a
|
||||
jLocked jId act = do
|
||||
hasLock <- liftIO $ newTVarIO False
|
||||
|
||||
jLocked :: QueuedJobId -> (Entity QueuedJob -> ReaderT JobContext Handler a) -> ReaderT JobContext Handler a
|
||||
jLocked jId act = flip evalStateT False $ do
|
||||
let
|
||||
lock = runDB . setSerializable $ do
|
||||
qj@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId
|
||||
lock :: StateT Bool (ReaderT JobContext Handler) (Entity QueuedJob)
|
||||
lock = hoist (hoist $ runDB . setSerializable) $ do
|
||||
qj@QueuedJob{..} <- lift . lift $ maybe (throwM $ JNonexistant jId) return =<< get jId
|
||||
instanceID' <- getsYesod $ view instanceID
|
||||
threshold <- getsYesod $ view _appJobStaleThreshold
|
||||
now <- liftIO getCurrentTime
|
||||
heldLocks <- asks jobHeldLocks
|
||||
isHeld <- (jId `Set.member`) <$> readTVarIO heldLocks
|
||||
hadStale <- maybeT (return False) $ do
|
||||
lockTime <- MaybeT $ return queuedJobLockTime
|
||||
lockInstance <- MaybeT $ return queuedJobLockInstance
|
||||
if
|
||||
| lockInstance == instanceID'
|
||||
, diffUTCTime now lockTime >= threshold
|
||||
, not isHeld
|
||||
-> return True
|
||||
| otherwise
|
||||
-> throwM $ JLocked jId lockInstance lockTime
|
||||
when hadStale .
|
||||
$logWarnS "Jobs" $ "Ignored stale lock: " <> tshow (Entity jId qj)
|
||||
val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID'
|
||||
, QueuedJobLockTime =. Just now
|
||||
]
|
||||
liftIO . atomically $ writeTVar hasLock True
|
||||
return val
|
||||
State.put True
|
||||
val <- lift . lift $ updateGet jId [ QueuedJobLockInstance =. Just instanceID'
|
||||
, QueuedJobLockTime =. Just now
|
||||
]
|
||||
atomically . modifyTVar' heldLocks $ Set.insert jId
|
||||
return $ Entity jId val
|
||||
|
||||
unlock = whenM (readTVarIO hasLock) $
|
||||
runDB . setSerializable $
|
||||
update jId [ QueuedJobLockInstance =. Nothing
|
||||
, QueuedJobLockTime =. Nothing
|
||||
]
|
||||
unlock :: Entity QueuedJob -> StateT Bool (ReaderT JobContext Handler) ()
|
||||
unlock (Entity jId' _) = whenM State.get $ do
|
||||
atomically . flip modifyTVar' (Set.delete jId') =<< asks jobHeldLocks
|
||||
lift . lift . runDB . setSerializable $
|
||||
update jId' [ QueuedJobLockInstance =. Nothing
|
||||
, QueuedJobLockTime =. Nothing
|
||||
]
|
||||
|
||||
bracket lock (const unlock) act
|
||||
bracket lock unlock $ lift . act
|
||||
|
||||
|
||||
pruneLastExecs :: Crontab JobCtl -> DB ()
|
||||
|
||||
@ -40,22 +40,14 @@ determineCrontab = execWriterT $ do
|
||||
}
|
||||
Nothing -> return ()
|
||||
|
||||
tell $ HashMap.singleton
|
||||
JobCtlDetermineCrontab
|
||||
Cron
|
||||
{ cronInitial = CronAsap
|
||||
, cronRepeat = CronRepeatScheduled CronAsap
|
||||
, cronRateLimit = appJobCronInterval
|
||||
, cronNotAfter = Right CronNotScheduled
|
||||
}
|
||||
whenIsJust appPruneUnreferencedFiles $ \pInterval ->
|
||||
whenIsJust appJobCronInterval $ \interval ->
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue JobPruneUnreferencedFiles)
|
||||
JobCtlDetermineCrontab
|
||||
Cron
|
||||
{ cronInitial = CronAsap
|
||||
, cronRepeat = CronRepeatScheduled CronAsap
|
||||
, cronRateLimit = pInterval
|
||||
, cronNotAfter = Right CronNotScheduled
|
||||
{ cronInitial = CronAsap
|
||||
, cronRepeat = CronRepeatScheduled CronAsap
|
||||
, cronRateLimit = interval
|
||||
, cronNotAfter = Right CronNotScheduled
|
||||
}
|
||||
|
||||
oldestInvitationMUTC <- lift $ preview (_head . _entityVal . _invitationExpiresAt . _Just) <$> selectList [InvitationExpiresAt !=. Nothing] [Asc InvitationExpiresAt, LimitTo 1]
|
||||
@ -98,6 +90,15 @@ determineCrontab = execWriterT $ do
|
||||
, cronRateLimit = iInterval
|
||||
, cronNotAfter = Right CronNotScheduled
|
||||
}
|
||||
whenIsJust appRechunkFiles $ \rInterval ->
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue JobRechunkFiles)
|
||||
Cron
|
||||
{ cronInitial = CronAsap
|
||||
, cronRepeat = CronRepeatScheduled CronAsap
|
||||
, cronRateLimit = rInterval
|
||||
, cronNotAfter = Right CronNotScheduled
|
||||
}
|
||||
|
||||
tell . flip foldMap universeF $ \kind ->
|
||||
case appHealthCheckInterval kind of
|
||||
@ -138,33 +139,31 @@ determineCrontab = execWriterT $ do
|
||||
, cronNotAfter = Right CronNotScheduled
|
||||
}
|
||||
|
||||
let
|
||||
getNextIntervals within interval cInterval = do
|
||||
now <- liftIO getPOSIXTime
|
||||
return $ do
|
||||
let
|
||||
epochInterval = within / 2
|
||||
(currEpoch, epochNow) = now `divMod'` epochInterval
|
||||
currInterval = epochNow `div'` interval
|
||||
numIntervals = floor $ epochInterval / interval
|
||||
n = ceiling $ 4 * cInterval / interval
|
||||
i <- [ negate (ceiling $ n % 2) .. ceiling $ n % 2 ]
|
||||
let
|
||||
((+ currEpoch) -> nextEpoch, nextInterval) = (currInterval + i) `divMod` numIntervals
|
||||
nextIntervalTime
|
||||
= posixSecondsToUTCTime $ fromInteger nextEpoch * epochInterval + fromInteger nextInterval * interval
|
||||
return (nextEpoch, nextInterval, nextIntervalTime, numIntervals)
|
||||
|
||||
if
|
||||
| is _Just appLdapConf
|
||||
, is _Just appLdapConf
|
||||
, Just syncWithin <- appSynchroniseLdapUsersWithin
|
||||
, Just cInterval <- appJobCronInterval
|
||||
-> do
|
||||
now <- liftIO getPOSIXTime
|
||||
let
|
||||
epochInterval = syncWithin / 2
|
||||
interval = appSynchroniseLdapUsersInterval
|
||||
nextIntervals <- getNextIntervals syncWithin appSynchroniseLdapUsersInterval cInterval
|
||||
|
||||
(ldapEpoch, epochNow) = now `divMod'` epochInterval
|
||||
ldapInterval = epochNow `div'` interval
|
||||
numIntervals = floor $ epochInterval / interval
|
||||
|
||||
nextIntervals = do
|
||||
let
|
||||
n = ceiling $ 4 * appJobCronInterval / appSynchroniseLdapUsersInterval
|
||||
i <- [negate (ceiling $ n % 2) .. ceiling $ n % 2]
|
||||
let
|
||||
((+ ldapEpoch) -> nextEpoch, nextInterval) = (ldapInterval + i) `divMod` numIntervals
|
||||
nextIntervalTime
|
||||
= posixSecondsToUTCTime $ fromInteger nextEpoch * epochInterval + fromInteger nextInterval * interval
|
||||
return (nextEpoch, nextInterval, nextIntervalTime)
|
||||
|
||||
forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime) -> do
|
||||
$logDebugS "SynchroniseLdap" [st|currentTime: #{tshow ldapEpoch}.#{tshow epochNow}; upcomingSync: #{tshow nextEpoch}.#{tshow (fromInteger nextInterval * interval)}; upcomingData: #{tshow (numIntervals, nextEpoch, nextInterval)}|]
|
||||
forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue JobSynchroniseLdap
|
||||
{ jEpoch = fromInteger nextEpoch
|
||||
@ -180,6 +179,22 @@ determineCrontab = execWriterT $ do
|
||||
| otherwise
|
||||
-> return ()
|
||||
|
||||
whenIsJust ((,) <$> appPruneUnreferencedFilesWithin <*> appJobCronInterval) $ \(within, cInterval) -> do
|
||||
nextIntervals <- getNextIntervals within appPruneUnreferencedFilesInterval cInterval
|
||||
forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue JobPruneUnreferencedFiles
|
||||
{ jEpoch = fromInteger nextEpoch
|
||||
, jNumIterations = fromInteger numIntervals
|
||||
, jIteration = fromInteger nextInterval
|
||||
}
|
||||
)
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime
|
||||
, cronRepeat = CronRepeatNever
|
||||
, cronRateLimit = appPruneUnreferencedFilesInterval
|
||||
, cronNotAfter = Left within
|
||||
}
|
||||
|
||||
let
|
||||
sheetJobs (Entity nSheet Sheet{..}) = do
|
||||
|
||||
@ -1,27 +1,36 @@
|
||||
module Jobs.Handler.Files
|
||||
( dispatchJobPruneSessionFiles
|
||||
, dispatchJobPruneUnreferencedFiles
|
||||
, dispatchJobInjectFiles
|
||||
, dispatchJobInjectFiles, dispatchJobRechunkFiles
|
||||
) where
|
||||
|
||||
import Import hiding (matching)
|
||||
import Import hiding (matching, maximumBy, init)
|
||||
|
||||
import Database.Persist.Sql (deleteWhereCount)
|
||||
|
||||
import qualified Database.Esqueleto as E
|
||||
import qualified Database.Esqueleto.PostgreSQL as E
|
||||
import qualified Database.Esqueleto.Utils as E
|
||||
import qualified Database.Esqueleto.Internal.Sql as E (unsafeSqlCastAs)
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import qualified Data.Conduit.List as C (mapMaybe)
|
||||
import qualified Data.Conduit.List as C (mapMaybe, unfoldM)
|
||||
|
||||
import Handler.Utils.Minio
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
import qualified Crypto.Hash as Crypto
|
||||
import qualified Data.ByteString.Base64.URL as Base64
|
||||
import Crypto.Hash (hashDigestSize, digestFromByteString)
|
||||
|
||||
import Control.Monad.Memo (startEvalMemoT, memo)
|
||||
import Data.List ((!!), unfoldr, maximumBy, init, genericLength)
|
||||
import qualified Data.ByteString as ByteString
|
||||
import Data.Bits (Bits(shiftR))
|
||||
|
||||
import qualified Data.Map.Strict as Map
|
||||
|
||||
import Control.Monad.Random.Lazy
|
||||
import System.Random.Shuffle (shuffleM)
|
||||
import System.IO.Unsafe
|
||||
|
||||
import Handler.Utils.Files (sourceFileDB)
|
||||
|
||||
|
||||
dispatchJobPruneSessionFiles :: JobHandler UniWorX
|
||||
@ -44,72 +53,190 @@ fileReferences (E.just -> fHash)
|
||||
, E.from $ \subFile -> E.where_ $ subFile E.^. SubmissionFileContent E.==. fHash
|
||||
, E.from $ \sessFile -> E.where_ $ sessFile E.^. SessionFileContent E.==. fHash
|
||||
, E.from $ \lock -> E.where_ $ E.just (lock E.^. FileLockContent) E.==. fHash
|
||||
, E.from $ \chunkLock -> E.where_ . E.exists . E.from $ \fileContentEntry ->
|
||||
E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. fHash
|
||||
E.&&. chunkLock E.^. FileChunkLockHash E.==. E.subSelectForeign fileContentEntry FileContentEntryChunkHash (E.^. FileContentChunkHash)
|
||||
]
|
||||
|
||||
|
||||
{-# NOINLINE pruneUnreferencedFilesIntervalsCache #-}
|
||||
pruneUnreferencedFilesIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)])
|
||||
pruneUnreferencedFilesIntervalsCache = unsafePerformIO $ newTVarIO Map.empty
|
||||
|
||||
dispatchJobPruneUnreferencedFiles :: JobHandler UniWorX
|
||||
dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do
|
||||
dispatchJobPruneUnreferencedFiles :: Natural -> Natural -> Natural -> JobHandler UniWorX
|
||||
dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtomic . hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
interval <- fmap (fmap $ max 0) . getsYesod $ view _appPruneUnreferencedFiles
|
||||
interval <- getsYesod $ view _appPruneUnreferencedFilesInterval
|
||||
keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles
|
||||
|
||||
E.update $ \fileContent -> do
|
||||
let isReferenced = E.any E.exists . fileReferences $ fileContent E.^. FileContentHash
|
||||
now' = E.unsafeSqlCastAs "TIMESTAMP WITH TIME ZONE" $ E.val now
|
||||
shouldBe = E.bool (E.just . E.maybe now' (E.min now') $ fileContent E.^. FileContentUnreferencedSince) E.nothing isReferenced
|
||||
E.set fileContent [ FileContentUnreferencedSince E.=. shouldBe ]
|
||||
let
|
||||
chunkHashBytes :: forall h.
|
||||
( Unwrapped FileContentChunkReference ~ Digest h )
|
||||
=> Integer
|
||||
chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h))
|
||||
chunkHashBits = chunkHashBytes * 8
|
||||
base :: Integer
|
||||
base = 2 ^ chunkHashBits
|
||||
intervals :: [Integer]
|
||||
-- | Exclusive upper bounds
|
||||
intervals
|
||||
| numIterations <= 0 = pure base
|
||||
| otherwise = go protoIntervals ^.. folded . _1
|
||||
where
|
||||
go [] = []
|
||||
go ints
|
||||
| maximumOf (folded . _1) ints == Just base = ints
|
||||
| otherwise = go $ lts ++ over _1 succ (over _2 (subtract $ toInteger numIterations) closest) : map (over _1 succ) gts
|
||||
where
|
||||
closest = maximumBy (comparing $ view _2) ints
|
||||
(lts, geqs) = partition (((>) `on` view _1) closest) ints
|
||||
gts = filter (((<) `on` view _1) closest) geqs
|
||||
-- | Exclusive upper bounds
|
||||
protoIntervals :: [(Integer, Integer)]
|
||||
protoIntervals = [ over _1 (i *) $ base `divMod` toInteger numIterations
|
||||
| i <- [1 .. toInteger numIterations]
|
||||
]
|
||||
|
||||
intervalsDgsts' = zipWith (curry . over both $ toDigest <=< assertM' (> 0)) (0 : init intervals) intervals
|
||||
|
||||
toDigest :: Integer -> Maybe FileContentChunkReference
|
||||
toDigest = fmap (review _Wrapped) . digestFromByteString . pad . ByteString.pack . reverse . unfoldr step
|
||||
where step i
|
||||
| i <= 0 = Nothing
|
||||
| otherwise = Just (fromIntegral i, i `shiftR` 8)
|
||||
pad bs
|
||||
| toInteger (ByteString.length bs) >= chunkHashBytes = bs
|
||||
| otherwise = pad $ ByteString.cons 0 bs
|
||||
|
||||
intervalsDgsts <- atomically $ do
|
||||
cachedDgsts <- readTVar pruneUnreferencedFilesIntervalsCache
|
||||
case Map.lookup numIterations cachedDgsts of
|
||||
Just c -> return c
|
||||
Nothing -> do
|
||||
modifyTVar' pruneUnreferencedFilesIntervalsCache $ force . Map.insert numIterations intervalsDgsts'
|
||||
return intervalsDgsts'
|
||||
|
||||
let
|
||||
getCandidates = E.selectSource . E.from $ \fileContent -> do
|
||||
E.where_ . E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) $ fileContent E.^. FileContentUnreferencedSince
|
||||
return ( fileContent E.^. FileContentHash
|
||||
, E.length_ $ fileContent E.^. FileContentContent
|
||||
permIntervalsDgsts = shuffleM intervalsDgsts `evalRand` mkStdGen (hash epoch)
|
||||
|
||||
(minBoundDgst, maxBoundDgst) = permIntervalsDgsts !! fromIntegral (toInteger iteration `mod` genericLength permIntervalsDgsts)
|
||||
chunkIdFilter :: E.SqlExpr (E.Value FileContentChunkReference) -> E.SqlExpr (E.Value Bool)
|
||||
chunkIdFilter cRef = E.and $ catMaybes
|
||||
[ minBoundDgst <&> \b -> cRef E.>=. E.val b
|
||||
, maxBoundDgst <&> \b -> cRef E.<. E.val b
|
||||
]
|
||||
|
||||
$logDebugS "PruneUnreferencedFiles" . tshow $ (minBoundDgst, maxBoundDgst)
|
||||
|
||||
E.insertSelectWithConflict
|
||||
(UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint")
|
||||
(E.from $ \fileContentChunk -> do
|
||||
E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkId
|
||||
return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . chunkIdFilter $ fileContentChunk E.^. FileContentChunkHash
|
||||
return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now
|
||||
)
|
||||
(\current excluded ->
|
||||
[ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ]
|
||||
)
|
||||
|
||||
E.delete . E.from $ \fileContentChunkUnreferenced -> do
|
||||
E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
|
||||
|
||||
let
|
||||
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do
|
||||
let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do
|
||||
E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
|
||||
return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince
|
||||
E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince
|
||||
|
||||
E.groupBy $ fileContentEntry E.^. FileContentEntryHash
|
||||
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryHash ]
|
||||
|
||||
return $ fileContentEntry E.^. FileContentEntryHash
|
||||
|
||||
deleteEntry :: _ -> DB (Sum Natural)
|
||||
deleteEntry (E.Value fRef) =
|
||||
bool 0 1 . (> 0) <$> deleteWhereCount [FileContentEntryHash ==. fRef]
|
||||
|
||||
Sum deletedEntries <- runConduit $
|
||||
getEntryCandidates
|
||||
.| takeWhileTime (interval / 3)
|
||||
.| C.mapM deleteEntry
|
||||
.| C.fold
|
||||
|
||||
when (deletedEntries > 0) $
|
||||
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|]
|
||||
|
||||
let
|
||||
getChunkCandidates = E.selectSource . E.from $ \fileContentChunkUnreferenced -> do
|
||||
E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now)
|
||||
E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry ->
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
|
||||
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
|
||||
|
||||
return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
, E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent)
|
||||
)
|
||||
|
||||
Sum deleted <- runConduit $
|
||||
getCandidates
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
|
||||
deleteChunk :: _ -> DB (Sum Natural, Sum Word64)
|
||||
deleteChunk (E.Value cRef, E.Value size) = do
|
||||
deleteWhere [ FileContentChunkUnreferencedHash ==. cRef ]
|
||||
(, Sum size) . fromIntegral <$> deleteWhereCount [FileContentChunkHash ==. unFileContentChunkKey cRef]
|
||||
|
||||
(Sum deletedChunks, Sum deletedChunkSize) <- runConduit $
|
||||
getChunkCandidates
|
||||
.| takeWhileTime (interval / 3)
|
||||
.| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64)
|
||||
.| C.map (view $ _1 . _Value)
|
||||
.| C.mapM (\fRef -> Sum <$> deleteWhereCount [FileContentHash ==. fRef])
|
||||
.| C.mapM deleteChunk
|
||||
.| C.fold
|
||||
when (deleted > 0) $
|
||||
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{deleted} long-unreferenced files|]
|
||||
|
||||
when (deletedChunks > 0) $
|
||||
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{tshow deletedChunkSize} bytes)|]
|
||||
|
||||
|
||||
dispatchJobInjectFiles :: JobHandler UniWorX
|
||||
dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
uploadBucket <- getsYesod $ view _appUploadCacheBucket
|
||||
interval <- getsYesod $ view _appInjectFiles
|
||||
now <- liftIO getCurrentTime
|
||||
|
||||
let
|
||||
extractReference (Minio.ListItemObject oi)
|
||||
| Right bs <- Base64.decodeUnpadded . encodeUtf8 $ Minio.oiObject oi
|
||||
, Just fRef <- Crypto.digestFromByteString bs
|
||||
= Just (oi, fRef)
|
||||
extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference
|
||||
extractReference _ = Nothing
|
||||
|
||||
injectOrDelete :: (Minio.Object, FileContentReference)
|
||||
-> Handler (Sum Int64, Sum Int64) -- ^ Injected, Already existed
|
||||
injectOrDelete (obj, fRef) = maybeT (return mempty) $ do
|
||||
res <- hoist (startEvalMemoT . hoistStateCache (runDB . setSerializable)) $ do
|
||||
alreadyInjected <- lift . lift $ exists [ FileContentHash ==. fRef ]
|
||||
if | alreadyInjected -> return (mempty, Sum 1)
|
||||
| otherwise -> do
|
||||
content <- flip memo obj $ \obj' -> hoistMaybeM . runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj' Minio.defaultGetObjectOptions
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| C.fold
|
||||
-> Handler (Sum Int64) -- ^ Injected
|
||||
injectOrDelete (obj, fRef) = do
|
||||
fRef' <- runDB . setSerializable $ do
|
||||
chunkVar <- newEmptyTMVarIO
|
||||
dbAsync <- allocateLinkedAsync $ do
|
||||
atomically $ isEmptyTMVar chunkVar >>= guard . not
|
||||
sinkFileDB False $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) ()
|
||||
|
||||
fmap ((, mempty) . Sum) . lift. lift . E.insertSelectCount $
|
||||
let isReferenced = E.any E.exists $ fileReferences (E.val fRef)
|
||||
now' = E.unsafeSqlCastAs "TIMESTAMP WITH TIME ZONE" $ E.val now
|
||||
in return $ FileContent E.<# E.val fRef E.<&> E.val content E.<&> E.bool (E.just now') E.nothing isReferenced
|
||||
runAppMinio . maybeT (return ()) . catchIfMaybeT minioIsDoesNotExist $ Minio.removeObject uploadBucket obj
|
||||
return res
|
||||
didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar . Just)
|
||||
return True
|
||||
if
|
||||
| not didSend -> Nothing <$ cancel dbAsync
|
||||
| otherwise -> do
|
||||
atomically $ putTMVar chunkVar Nothing
|
||||
Just <$> waitAsync dbAsync
|
||||
let matchesFRef = is _Just $ assertM (== fRef) fRef'
|
||||
if | matchesFRef ->
|
||||
maybeT (return ()) . runAppMinio . handleIf minioIsDoesNotExist (const $ return ()) $ Minio.removeObject uploadBucket obj
|
||||
| otherwise ->
|
||||
$logErrorS "InjectFiles" [st|Minio object “#{obj}”'s content does not match it's name (content hash: #{tshow fRef'} /= name hash: #{tshow fRef})|]
|
||||
return . bool mempty (Sum 1) $ is _Just fRef'
|
||||
|
||||
(Sum inj, Sum exc) <-
|
||||
Sum inj <-
|
||||
runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True)
|
||||
.| C.mapMaybe extractReference
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
|
||||
@ -118,7 +245,49 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
.| transPipe lift (C.mapM injectOrDelete)
|
||||
.| C.fold
|
||||
|
||||
when (exc > 0) $
|
||||
$logInfoS "InjectFiles" [st|Deleted #{exc} files from upload cache because they were already injected|]
|
||||
when (inj > 0) $
|
||||
$logInfoS "InjectFiles" [st|Injected #{inj} files from upload cache into database|]
|
||||
|
||||
|
||||
data RechunkFileException
|
||||
= RechunkFileExceptionHashMismatch
|
||||
{ oldHash, newHash :: FileContentReference }
|
||||
deriving (Eq, Ord, Show, Generic, Typeable)
|
||||
deriving anyclass (Exception)
|
||||
|
||||
dispatchJobRechunkFiles :: JobHandler UniWorX
|
||||
dispatchJobRechunkFiles = JobHandlerAtomic . hoist lift $ do
|
||||
interval <- getsYesod $ view _appRechunkFiles
|
||||
let
|
||||
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> E.distinctOnOrderBy [E.asc $ fileContentEntry E.^. FileContentEntryHash] $ do
|
||||
E.where_ . E.exists . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
|
||||
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
|
||||
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . E.not_ $ fileContentChunk E.^. FileContentChunkContentBased
|
||||
|
||||
let size = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
|
||||
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
|
||||
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
|
||||
return $ E.sum_ (E.length_ $ fileContentChunk E.^. FileContentChunkContent:: E.SqlExpr (E.Value Word64))
|
||||
|
||||
return ( fileContentEntry E.^. FileContentEntryHash
|
||||
, size
|
||||
)
|
||||
|
||||
rechunkFile :: FileContentReference -> Word64 -> DB (Sum Natural, Sum Word64)
|
||||
rechunkFile fRef sz = do
|
||||
fRef' <- sinkFileDB True $ sourceFileDB fRef
|
||||
unless (fRef == fRef') $
|
||||
throwM $ RechunkFileExceptionHashMismatch fRef fRef'
|
||||
return (Sum 1, Sum sz)
|
||||
|
||||
(Sum rechunkedEntries, Sum rechunkedSize) <- runConduit $
|
||||
getEntryCandidates
|
||||
.| C.mapMaybe (\(E.Value fRef, E.Value sz) -> (fRef, ) <$> sz)
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
|
||||
.| persistentTokenBucketTakeC' TokenBucketRechunkFiles (view _2 :: _ -> Word64)
|
||||
.| C.mapM (uncurry rechunkFile)
|
||||
.| C.fold
|
||||
|
||||
when (rechunkedEntries > 0 || rechunkedSize > 0) $
|
||||
$logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedEntries} files in database (#{tshow rechunkedSize} bytes)|]
|
||||
|
||||
@ -27,5 +27,5 @@ dispatchJobDeleteTransactionLogIPs = JobHandlerAtomic . hoist lift $ do
|
||||
retentionTime <- getsYesod $ view _appTransactionLogIPRetentionTime
|
||||
let cutoff = addUTCTime (- retentionTime) now
|
||||
|
||||
n <- updateWhereCount [ TransactionLogTime <. cutoff ] [ TransactionLogRemote =. Nothing ]
|
||||
n <- updateWhereCount [ TransactionLogTime <. cutoff, TransactionLogRemote !=. Nothing ] [ TransactionLogRemote =. Nothing ]
|
||||
$logInfoS "DeleteTransactionLogIPs" [st|Deleted #{n} IP entries from transaction log|]
|
||||
|
||||
@ -82,7 +82,7 @@ writeJobCtlBlock = writeJobCtlBlock' writeJobCtl
|
||||
|
||||
queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX (Maybe QueuedJobId)
|
||||
queueJobUnsafe queuedJobWriteLastExec job = do
|
||||
$logInfoS "queueJob" $ tshow job
|
||||
$logDebugS "queueJob" $ tshow job
|
||||
|
||||
doQueue <- fmap not . and2M (return $ jobNoQueueSame job) $ exists [ QueuedJobContent ==. toJSON job ]
|
||||
|
||||
|
||||
@ -86,9 +86,13 @@ data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notifica
|
||||
, jDisplayEmail :: UserEmail
|
||||
}
|
||||
| JobPruneSessionFiles
|
||||
| JobPruneUnreferencedFiles
|
||||
| JobPruneUnreferencedFiles { jNumIterations
|
||||
, jEpoch
|
||||
, jIteration :: Natural
|
||||
}
|
||||
| JobInjectFiles
|
||||
| JobPruneFallbackPersonalisedSheetFilesKeys
|
||||
| JobRechunkFiles
|
||||
deriving (Eq, Ord, Show, Read, Generic, Typeable)
|
||||
data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId }
|
||||
| NotificationSheetActive { nSheet :: SheetId }
|
||||
@ -223,6 +227,7 @@ newWorkerId = JobWorkerId <$> liftIO newUnique
|
||||
data JobContext = JobContext
|
||||
{ jobCrontab :: TVar (Crontab JobCtl)
|
||||
, jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException))))
|
||||
, jobHeldLocks :: TVar (Set QueuedJobId)
|
||||
}
|
||||
|
||||
|
||||
@ -251,6 +256,8 @@ jobNoQueueSame = \case
|
||||
JobPruneSessionFiles{} -> True
|
||||
JobPruneUnreferencedFiles{} -> True
|
||||
JobInjectFiles{} -> True
|
||||
JobPruneFallbackPersonalisedSheetFilesKeys{} -> True
|
||||
JobRechunkFiles{} -> True
|
||||
_ -> False
|
||||
|
||||
|
||||
|
||||
@ -7,6 +7,8 @@ module Model.Migration
|
||||
|
||||
import Import.NoModel hiding (Max(..), Last(..))
|
||||
import Model
|
||||
import Settings
|
||||
import Foundation.Type
|
||||
import Jobs.Types
|
||||
import Audit.Types
|
||||
import Model.Migration.Version
|
||||
@ -40,6 +42,8 @@ import qualified Data.CaseInsensitive as CI
|
||||
import qualified Data.Aeson as Aeson
|
||||
|
||||
import Web.ServerSession.Backend.Persistent.Memcached (migrateMemcachedSqlStorage)
|
||||
|
||||
import Data.Conduit.Algorithms.FastCDC (FastCDCParameters(fastCDCMinBlockSize))
|
||||
|
||||
-- Database versions must follow https://pvp.haskell.org:
|
||||
-- - Breaking changes are instances where manual migration is necessary (via customMigrations; i.e. changing a columns format)
|
||||
@ -80,6 +84,7 @@ migrateAll' = sequence_
|
||||
migrateAll :: ( MonadLogger m
|
||||
, MonadResource m
|
||||
, MonadUnliftIO m
|
||||
, MonadReader UniWorX m
|
||||
)
|
||||
=> ReaderT SqlBackend m ()
|
||||
migrateAll = do
|
||||
@ -112,7 +117,7 @@ requiresMigration = mapReaderT (exceptT return return) $ do
|
||||
$logInfoS "Migration" $ intercalate "; " initial
|
||||
throwError True
|
||||
|
||||
customs <- mapReaderT lift $ getMissingMigrations @_ @m
|
||||
customs <- mapReaderT lift $ getMissingMigrations @_ @(ReaderT UniWorX m)
|
||||
unless (Map.null customs) $ do
|
||||
$logInfoS "Migration" . intercalate ", " . map tshow $ Map.keys customs
|
||||
throwError True
|
||||
@ -134,6 +139,7 @@ getMissingMigrations :: forall m m'.
|
||||
( MonadLogger m
|
||||
, MonadIO m
|
||||
, MonadResource m'
|
||||
, MonadReader UniWorX m'
|
||||
)
|
||||
=> ReaderT SqlBackend m (Map (Key AppliedMigration) (ReaderT SqlBackend m' ()))
|
||||
getMissingMigrations = do
|
||||
@ -180,7 +186,9 @@ migrateManual = do
|
||||
-}
|
||||
|
||||
customMigrations :: forall m.
|
||||
MonadResource m
|
||||
( MonadResource m
|
||||
, MonadReader UniWorX m
|
||||
)
|
||||
=> Map (Key AppliedMigration) (ReaderT SqlBackend m ())
|
||||
customMigrations = Map.fromListWith (>>)
|
||||
[ ( AppliedMigrationKey [migrationVersion|initial|] [version|0.0.0|]
|
||||
@ -915,13 +923,32 @@ customMigrations = Map.fromListWith (>>)
|
||||
|
||||
)
|
||||
, ( AppliedMigrationKey [migrationVersion|39.0.0|] [version|40.0.0|]
|
||||
, whenM (tableExists "study_features") $ do
|
||||
, whenM (tableExists "study_features")
|
||||
[executeQQ|
|
||||
ALTER TABLE study_features RENAME updated TO last_observed;
|
||||
ALTER TABLE study_features ADD COLUMN first_observed timestamp with time zone;
|
||||
UPDATE study_features SET first_observed = (SELECT MAX(last_observed) FROM study_features as other WHERE other."user" = study_features."user" AND other.degree = study_features.degree AND other.field = study_features.field AND other.type = study_features.type AND other.semester = study_features.semester - 1);
|
||||
|]
|
||||
)
|
||||
, ( AppliedMigrationKey [migrationVersion|40.0.0|] [version|41.0.0|]
|
||||
, whenM (tableExists "file_content") $ do
|
||||
chunkingParams <- lift $ view _appFileChunkingParams
|
||||
|
||||
[executeQQ|
|
||||
ALTER TABLE file_content RENAME TO file_content_chunk;
|
||||
ALTER INDEX file_content_pkey RENAME TO file_content_chunk_pkey;
|
||||
|
||||
CREATE TABLE file_content_chunk_unreferenced (id bigserial, hash bytea NOT NULL, since timestamp with time zone NOT NULL);
|
||||
INSERT INTO file_content_chunk_unreferenced (since, hash) (SELECT unreferenced_since as since, hash FROM file_content_chunk WHERE NOT (unreferenced_since IS NULL));
|
||||
ALTER TABLE file_content_chunk DROP COLUMN unreferenced_since;
|
||||
|
||||
ALTER TABLE file_content_chunk ADD COLUMN content_based boolean NOT NULL DEFAULT false;
|
||||
UPDATE file_content_chunk SET content_based = true WHERE length(content) <= #{fastCDCMinBlockSize chunkingParams};
|
||||
|
||||
CREATE TABLE file_content_entry (hash bytea NOT NULL, ix bigint NOT NULL, chunk_hash bytea NOT NULL);
|
||||
INSERT INTO file_content_entry (hash, chunk_hash, ix) (SELECT hash, hash as chunk_hash, 0 as ix FROM file_content_chunk);
|
||||
|]
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
|
||||
@ -54,5 +54,3 @@ type InstanceId = UUID
|
||||
type ClusterId = UUID
|
||||
type TokenId = UUID
|
||||
type TermCandidateIncidence = UUID
|
||||
|
||||
type FileContentReference = Digest SHA3_512
|
||||
|
||||
@ -1,23 +1,129 @@
|
||||
module Model.Types.File
|
||||
( File(..), _fileTitle, _fileContent, _fileModified
|
||||
( FileContentChunkReference(..), FileContentReference(..)
|
||||
, File(..), _fileTitle, _fileContent, _fileModified
|
||||
, PureFile, toPureFile, fromPureFile, pureFileToFileReference, _pureFileContent
|
||||
, transFile
|
||||
, minioFileReference
|
||||
, FileReference(..), _fileReferenceTitle, _fileReferenceContent, _fileReferenceModified
|
||||
, HasFileReference(..), IsFileReference(..), FileReferenceResidual(..)
|
||||
, HasFileReference(..), IsFileReference(..), FileReferenceResidual(FileReferenceResidual, FileReferenceResidualEither, unFileReferenceResidualEither, FileReferenceResidualEntity, fileReferenceResidualEntityKey, fileReferenceResidualEntityResidual, unPureFileResidual)
|
||||
) where
|
||||
|
||||
import Import.NoModel
|
||||
import Model.Types.Common (FileContentReference)
|
||||
|
||||
import Database.Persist.Sql (PersistFieldSql)
|
||||
import Web.HttpApiData (ToHttpApiData, FromHttpApiData)
|
||||
import Data.ByteArray (ByteArrayAccess)
|
||||
|
||||
import qualified Data.ByteString.Base64.URL as Base64
|
||||
import qualified Data.ByteArray as ByteArray
|
||||
import qualified Network.Minio as Minio (Object)
|
||||
import qualified Crypto.Hash as Crypto (digestFromByteString)
|
||||
import qualified Crypto.Hash.Conduit as Crypto (sinkHash)
|
||||
|
||||
import Utils.Lens.TH
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
data File = File
|
||||
import Text.Show
|
||||
|
||||
|
||||
|
||||
newtype FileContentChunkReference = FileContentChunkReference (Digest SHA3_512)
|
||||
deriving (Eq, Ord, Read, Show, Lift, Generic, Typeable)
|
||||
deriving newtype ( PersistField, PersistFieldSql
|
||||
, PathPiece, ToHttpApiData, FromHttpApiData, ToJSON, FromJSON
|
||||
, Hashable, NFData
|
||||
, ByteArrayAccess
|
||||
)
|
||||
|
||||
makeWrapped ''FileContentChunkReference
|
||||
|
||||
newtype FileContentReference = FileContentReference (Digest SHA3_512)
|
||||
deriving (Eq, Ord, Read, Show, Lift, Generic, Typeable)
|
||||
deriving newtype ( PersistField, PersistFieldSql
|
||||
, PathPiece, ToHttpApiData, FromHttpApiData, ToJSON, FromJSON
|
||||
, Hashable, NFData
|
||||
, ByteArrayAccess
|
||||
)
|
||||
|
||||
makeWrapped ''FileContentReference
|
||||
|
||||
|
||||
minioFileReference :: Prism' Minio.Object FileContentReference
|
||||
minioFileReference = prism' toObjectName fromObjectName
|
||||
where toObjectName = decodeUtf8 . Base64.encodeUnpadded . ByteArray.convert
|
||||
fromObjectName = fmap (review _Wrapped) . Crypto.digestFromByteString <=< preview _Right . Base64.decodeUnpadded . encodeUtf8
|
||||
|
||||
|
||||
data File m = File
|
||||
{ fileTitle :: FilePath
|
||||
, fileContent :: Maybe ByteString
|
||||
, fileContent :: Maybe (ConduitT () ByteString m ())
|
||||
, fileModified :: UTCTime
|
||||
} deriving (Eq, Ord, Read, Show, Generic, Typeable)
|
||||
} deriving (Generic, Typeable)
|
||||
|
||||
makeLenses_ ''File
|
||||
|
||||
type PureFile = File Identity
|
||||
|
||||
_pureFileContent :: forall bs.
|
||||
( IsSequence bs
|
||||
, Element bs ~ Word8
|
||||
)
|
||||
=> Lens' PureFile (Maybe bs)
|
||||
_pureFileContent = lens getPureFileContent setPureFileContent
|
||||
where
|
||||
getPureFileContent = fmap (repack . runIdentity . runConduit . (.| C.fold)) . fileContent
|
||||
setPureFileContent f bs = f { fileContent = yield . repack <$> bs }
|
||||
|
||||
toPureFile :: Monad m => File m -> m PureFile
|
||||
toPureFile File{..} = do
|
||||
c <- for fileContent $ runConduit . (.| C.fold)
|
||||
return File
|
||||
{ fileContent = fmap yield c
|
||||
, ..
|
||||
}
|
||||
|
||||
fromPureFile :: Monad m => PureFile -> File m
|
||||
fromPureFile = transFile generalize
|
||||
|
||||
pureFileToFileReference :: PureFile -> FileReference
|
||||
pureFileToFileReference File{..} = FileReference
|
||||
{ fileReferenceTitle = fileTitle
|
||||
, fileReferenceContent = review _Wrapped . runIdentity . runConduit . (.| Crypto.sinkHash) <$> fileContent
|
||||
, fileReferenceModified = fileModified
|
||||
}
|
||||
|
||||
instance Eq PureFile where
|
||||
a == b = all (\f -> f a b)
|
||||
[ (==) `on` fileTitle
|
||||
, (==) `on` fileModified
|
||||
, (==) `on` (view _pureFileContent :: PureFile -> Maybe ByteString)
|
||||
]
|
||||
instance Ord PureFile where
|
||||
compare = mconcat
|
||||
[ comparing fileTitle
|
||||
, comparing (view _pureFileContent :: PureFile -> Maybe ByteString)
|
||||
, comparing fileModified
|
||||
]
|
||||
instance Show PureFile where
|
||||
showsPrec _ f@File{..}
|
||||
= showString "File{"
|
||||
. showString "fileTitle = "
|
||||
. shows fileTitle
|
||||
. showString ", "
|
||||
. showString "fileContent = "
|
||||
. (case f ^. _pureFileContent of
|
||||
Nothing -> showString "Nothing"
|
||||
Just c -> showString "Just $ yield " . showsPrec 11 (c :: ByteString)
|
||||
)
|
||||
. showString ", "
|
||||
. showString "fileModified = "
|
||||
. shows fileModified
|
||||
. showString "}"
|
||||
|
||||
transFile :: Monad m => (forall a. m a -> n a) -> (File m -> File n)
|
||||
transFile l File{..} = File{ fileContent = transPipe l <$> fileContent, .. }
|
||||
|
||||
data FileReference = FileReference
|
||||
{ fileReferenceTitle :: FilePath
|
||||
, fileReferenceContent :: Maybe FileContentReference
|
||||
@ -36,6 +142,24 @@ instance HasFileReference FileReference where
|
||||
data FileReferenceResidual FileReference = FileReferenceResidual
|
||||
_FileReference = iso (, FileReferenceResidual) $ view _1
|
||||
|
||||
instance HasFileReference PureFile where
|
||||
newtype FileReferenceResidual PureFile = PureFileResidual { unPureFileResidual :: Maybe ByteString }
|
||||
deriving (Eq, Ord, Read, Show, Generic, Typeable)
|
||||
|
||||
_FileReference = iso toFileReference fromFileReference
|
||||
where
|
||||
toFileReference File{..} = (FileReference{..}, PureFileResidual{..})
|
||||
where
|
||||
fileReferenceTitle = fileTitle
|
||||
(fileReferenceContent, unPureFileResidual) = ((,) <$> preview (_Just . _1) <*> preview (_Just . _2)) $
|
||||
over _1 (review _Wrapped) . runIdentity . runConduit . (.| getZipConduit ((,) <$> ZipConduit Crypto.sinkHash <*> ZipConduit C.fold)) <$> fileContent
|
||||
fileReferenceModified = fileModified
|
||||
fromFileReference (FileReference{..}, PureFileResidual{..}) = File
|
||||
{ fileTitle = fileReferenceTitle
|
||||
, fileContent = yield <$> unPureFileResidual
|
||||
, fileModified = fileReferenceModified
|
||||
}
|
||||
|
||||
instance (HasFileReference a, HasFileReference b) => HasFileReference (Either a b) where
|
||||
newtype FileReferenceResidual (Either a b) = FileReferenceResidualEither { unFileReferenceResidualEither :: Either (FileReferenceResidual a) (FileReferenceResidual b) }
|
||||
_FileReference = iso doSplit doJoin
|
||||
|
||||
@ -267,6 +267,7 @@ instance Csv.FromField Sex where
|
||||
|
||||
data TokenBucketIdent = TokenBucketInjectFiles
|
||||
| TokenBucketPruneFiles
|
||||
| TokenBucketRechunkFiles
|
||||
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
|
||||
deriving anyclass (Universe, Finite, Hashable)
|
||||
|
||||
|
||||
@ -70,6 +70,8 @@ import Text.Show (showParen, showString)
|
||||
import qualified Data.List.PointedList as P
|
||||
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
import Data.Conduit.Algorithms.FastCDC
|
||||
|
||||
|
||||
-- | Runtime settings to configure this application. These settings can be
|
||||
@ -113,7 +115,7 @@ data AppSettings = AppSettings
|
||||
, appMailSupport :: Address
|
||||
, appJobWorkers :: Natural
|
||||
, appJobFlushInterval :: Maybe NominalDiffTime
|
||||
, appJobCronInterval :: NominalDiffTime
|
||||
, appJobCronInterval :: Maybe NominalDiffTime
|
||||
, appJobStaleThreshold :: NominalDiffTime
|
||||
, appNotificationRateLimit :: NominalDiffTime
|
||||
, appNotificationCollateDelay :: NominalDiffTime
|
||||
@ -140,8 +142,10 @@ data AppSettings = AppSettings
|
||||
, appLdapReTestFailover :: DiffTime
|
||||
|
||||
, appSessionFilesExpire :: NominalDiffTime
|
||||
, appPruneUnreferencedFiles :: Maybe NominalDiffTime
|
||||
, appKeepUnreferencedFiles :: NominalDiffTime
|
||||
|
||||
, appPruneUnreferencedFilesWithin :: Maybe NominalDiffTime
|
||||
, appPruneUnreferencedFilesInterval :: NominalDiffTime
|
||||
|
||||
, appInitialLogSettings :: LogSettings
|
||||
|
||||
@ -172,6 +176,10 @@ data AppSettings = AppSettings
|
||||
, appUploadCacheConf :: Maybe Minio.ConnectInfo
|
||||
, appUploadCacheBucket :: Minio.Bucket
|
||||
, appInjectFiles :: Maybe NominalDiffTime
|
||||
, appRechunkFiles :: Maybe NominalDiffTime
|
||||
, appFileUploadDBChunksize :: Int
|
||||
|
||||
, appFileChunkingParams :: FastCDCParameters
|
||||
|
||||
, appFavouritesQuickActionsBurstsize
|
||||
, appFavouritesQuickActionsAvgInverseRate :: Word64
|
||||
@ -444,7 +452,7 @@ instance FromJSON AppSettings where
|
||||
|
||||
appJobWorkers <- o .: "job-workers"
|
||||
appJobFlushInterval <- o .:? "job-flush-interval"
|
||||
appJobCronInterval <- o .: "job-cron-interval"
|
||||
appJobCronInterval <- o .:? "job-cron-interval"
|
||||
appJobStaleThreshold <- o .: "job-stale-threshold"
|
||||
appNotificationRateLimit <- o .: "notification-rate-limit"
|
||||
appNotificationCollateDelay <- o .: "notification-collate-delay"
|
||||
@ -471,9 +479,17 @@ instance FromJSON AppSettings where
|
||||
appLdapReTestFailover <- o .: "ldap-re-test-failover"
|
||||
|
||||
appSessionFilesExpire <- o .: "session-files-expire"
|
||||
appPruneUnreferencedFiles <- o .:? "prune-unreferenced-files"
|
||||
appKeepUnreferencedFiles <- o .:? "keep-unreferenced-files" .!= 0
|
||||
appInjectFiles <- o .:? "inject-files"
|
||||
appRechunkFiles <- o .:? "rechunk-files"
|
||||
appFileUploadDBChunksize <- o .: "file-upload-db-chunksize"
|
||||
|
||||
appFileChunkingTargetExponent <- o .: "file-chunking-target-exponent"
|
||||
appFileChunkingHashWindow <- o .: "file-chunking-hash-window"
|
||||
appFileChunkingParams <- maybe (fail "Could not recommend FastCDCParameters") return $ recommendFastCDCParameters appFileChunkingTargetExponent appFileChunkingHashWindow
|
||||
|
||||
appPruneUnreferencedFilesWithin <- o .: "prune-unreferenced-files-within"
|
||||
appPruneUnreferencedFilesInterval <- o .: "prune-unreferenced-files-interval"
|
||||
|
||||
appMaximumContentLength <- o .: "maximum-content-length"
|
||||
|
||||
|
||||
27
src/Utils.hs
27
src/Utils.hs
@ -56,7 +56,8 @@ import Control.Arrow as Utils ((>>>))
|
||||
import Control.Monad.Trans.Except (ExceptT(..), throwE, runExceptT)
|
||||
import Control.Monad.Except (MonadError(..))
|
||||
import Control.Monad.Trans.Maybe as Utils (MaybeT(..))
|
||||
import Control.Monad.Trans.Writer.Lazy (WriterT, execWriterT, tell)
|
||||
import Control.Monad.Trans.Writer.Strict (execWriterT)
|
||||
import Control.Monad.Writer.Class (MonadWriter(..))
|
||||
import Control.Monad.Catch
|
||||
import Control.Monad.Morph (hoist)
|
||||
import Control.Monad.Fail
|
||||
@ -83,6 +84,9 @@ import qualified Crypto.Saltine.Class as Saltine
|
||||
import qualified Crypto.Data.PKCS7 as PKCS7
|
||||
import Crypto.MAC.KMAC (KMAC, HashSHAKE)
|
||||
import qualified Crypto.MAC.KMAC as KMAC
|
||||
import qualified Crypto.Hash as Crypto
|
||||
import Crypto.Hash (HashAlgorithm, Digest)
|
||||
import Crypto.Hash.Instances ()
|
||||
|
||||
import Data.ByteArray (ByteArrayAccess)
|
||||
|
||||
@ -843,7 +847,7 @@ diffTimeout timeoutLength timeoutRes act = fromMaybe timeoutRes <$> timeout time
|
||||
= let (MkFixed micro :: Micro) = realToFrac timeoutLength
|
||||
in fromInteger micro
|
||||
|
||||
tellM :: (Monad m, Monoid x) => m x -> WriterT x m ()
|
||||
tellM :: (MonadTrans t, MonadWriter x (t m), Monad m) => m x -> t m ()
|
||||
tellM = tell <=< lift
|
||||
|
||||
-------------
|
||||
@ -856,6 +860,19 @@ peekN n = do
|
||||
mapM_ leftover peeked
|
||||
return peeked
|
||||
|
||||
peekWhile :: forall a o m. Monad m => (a -> Bool) -> ConduitT a o m [a]
|
||||
peekWhile p = do
|
||||
let go acc = do
|
||||
next <- await
|
||||
case next of
|
||||
Nothing -> return (reverse acc, Nothing)
|
||||
Just x
|
||||
| p x -> go $ x : acc
|
||||
| otherwise -> return (reverse acc, Just x)
|
||||
(peeked, failed) <- go []
|
||||
mapM_ leftover $ peeked ++ hoistMaybe failed
|
||||
return peeked
|
||||
|
||||
anyMC, allMC :: forall a o m. Monad m => (a -> m Bool) -> ConduitT a o m Bool
|
||||
anyMC f = C.mapM f .| orC
|
||||
allMC f = C.mapM f .| andC
|
||||
@ -1057,6 +1074,12 @@ kmaclazy :: forall a string key ba chunk.
|
||||
-> KMAC a
|
||||
kmaclazy str k = KMAC.finalize . KMAC.updates (KMAC.initialize @a str k) . toChunks
|
||||
|
||||
emptyHash :: forall a. HashAlgorithm a => Q (TExp (Digest a))
|
||||
-- ^ Hash of `mempty`
|
||||
--
|
||||
-- Computationally preferrable to computing the hash at runtime
|
||||
emptyHash = TH.liftTyped $ Crypto.hashFinalize Crypto.hashInit
|
||||
|
||||
-------------
|
||||
-- Caching --
|
||||
-------------
|
||||
|
||||
@ -3,6 +3,7 @@ module Utils.Files
|
||||
, sinkFile', sinkFiles'
|
||||
, FileUploads
|
||||
, replaceFileReferences, replaceFileReferences'
|
||||
, sinkFileDB
|
||||
) where
|
||||
|
||||
import Import.NoFoundation
|
||||
@ -11,31 +12,54 @@ import Handler.Utils.Minio
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
import qualified Crypto.Hash as Crypto (hash)
|
||||
import qualified Crypto.Hash.Conduit as Crypto (sinkHash)
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
import qualified Data.ByteString.Base64.URL as Base64
|
||||
import qualified Data.ByteArray as ByteArray
|
||||
import qualified Data.Conduit.List as C (unfoldM)
|
||||
|
||||
import qualified Data.Map.Lazy as Map
|
||||
import qualified Data.Set as Set
|
||||
import Control.Monad.State.Class (modify)
|
||||
|
||||
import qualified Data.Sequence as Seq
|
||||
|
||||
import Database.Persist.Sql (deleteWhereCount)
|
||||
|
||||
import Control.Monad.Trans.Resource (allocate)
|
||||
|
||||
import qualified Data.UUID.V4 as UUID
|
||||
|
||||
sinkFiles :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT File FileReference (SqlPersistT m) ()
|
||||
sinkFiles = C.mapM sinkFile
|
||||
import qualified Database.Esqueleto as E
|
||||
import qualified Database.Esqueleto.Utils as E
|
||||
|
||||
import Data.Conduit.Algorithms.FastCDC (fastCDC)
|
||||
|
||||
|
||||
sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX)
|
||||
=> Bool -- ^ Replace? Use only in serializable transaction
|
||||
-> ConduitT () ByteString (SqlPersistT m) ()
|
||||
-> SqlPersistT m FileContentReference
|
||||
sinkFileDB doReplace fileContentContent = do
|
||||
chunkingParams <- getsYesod $ view _appFileChunkingParams
|
||||
|
||||
let sinkChunk fileContentChunkContent = do
|
||||
fileChunkLockTime <- liftIO getCurrentTime
|
||||
fileChunkLockInstance <- getsYesod appInstanceID
|
||||
|
||||
tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. }
|
||||
existsChunk <- lift $ exists [FileContentChunkHash ==. fileContentChunkHash]
|
||||
let setContentBased = updateWhere [FileContentChunkHash ==. fileContentChunkHash] [FileContentChunkContentBased =. fileContentChunkContentBased]
|
||||
if | existsChunk -> lift setContentBased
|
||||
| otherwise -> lift . handleIfSql isUniqueConstraintViolation (const setContentBased) $
|
||||
insert_ FileContentChunk{..}
|
||||
return $ FileContentChunkKey fileContentChunkHash
|
||||
where fileContentChunkHash = _Wrapped # Crypto.hash fileContentChunkContent
|
||||
((review _Wrapped -> fileContentHash, fileContentChunks), chunkLocks) <- runConduit . runWriterC $
|
||||
transPipe lift fileContentContent
|
||||
.| fastCDC chunkingParams
|
||||
.| C.mapM (\c -> (c, ) <$> sinkChunk c)
|
||||
.| transPipe lift (getZipConduit $ (,) <$> ZipConduit (C.map (view _1) .| Crypto.sinkHash) <*> ZipConduit (C.foldMap $ views _2 Seq.singleton))
|
||||
|
||||
sinkFile :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => File -> SqlPersistT m FileReference
|
||||
sinkFile File{ fileContent = Nothing, .. } = return FileReference
|
||||
{ fileReferenceContent = Nothing
|
||||
, fileReferenceTitle = fileTitle
|
||||
, fileReferenceModified = fileModified
|
||||
}
|
||||
sinkFile File{ fileContent = Just fileContentContent, .. } = do
|
||||
void . withUnliftIO $ \UnliftIO{..} ->
|
||||
let takeLock = do
|
||||
fileLockTime <- liftIO getCurrentTime
|
||||
@ -44,35 +68,86 @@ sinkFile File{ fileContent = Just fileContentContent, .. } = do
|
||||
releaseLock lId = liftHandler . runDB $ (withReaderT projectBackend $ setSerializable (delete lId :: SqlPersistT (HandlerFor UniWorX) ()) :: YesodDB UniWorX ())
|
||||
in unliftIO $ allocate (unliftIO takeLock) (unliftIO . releaseLock)
|
||||
|
||||
inDB <- exists [ FileContentHash ==. fileContentHash ]
|
||||
deleteWhere [ FileChunkLockId <-. Set.toList chunkLocks ]
|
||||
|
||||
let sinkFileDB = unless inDB $ repsert (FileContentKey fileContentHash) FileContent{ fileContentUnreferencedSince = Nothing, .. }
|
||||
maybeT sinkFileDB $ do
|
||||
let uploadName = decodeUtf8 . Base64.encodeUnpadded $ ByteArray.convert fileContentHash
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
unless inDB . runAppMinio $ do
|
||||
uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket uploadName Minio.defaultGetObjectOptions
|
||||
unless uploadExists $ do
|
||||
let
|
||||
pooOptions = Minio.defaultPutObjectOptions
|
||||
{ Minio.pooCacheControl = Just "immutable"
|
||||
}
|
||||
Minio.putObject uploadBucket uploadName (C.sourceLazy $ fromStrict fileContentContent) (Just . fromIntegral $ olength fileContentContent) pooOptions
|
||||
-- Note that MinIO does not accept length zero uploads without an explicit length specification (not `Nothing` in the line above for the api we use)
|
||||
let entryExists = E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileContentHash
|
||||
insertEntries = handleIfSql isUniqueConstraintViolation (const $ return ()) . void $ insertMany_
|
||||
[ FileContentEntry{ fileContentEntryHash = fileContentHash, .. }
|
||||
| fileContentEntryChunkHash <- otoList fileContentChunks
|
||||
| fileContentEntryIx <- [0..]
|
||||
]
|
||||
if | not doReplace -> unlessM entryExists insertEntries
|
||||
| otherwise -> do
|
||||
deleteWhere [ FileContentEntryHash ==. fileContentHash ]
|
||||
insertEntries
|
||||
|
||||
|
||||
return fileContentHash
|
||||
where fileContentChunkContentBased = True
|
||||
|
||||
|
||||
sinkFiles :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT (File (SqlPersistT m)) FileReference (SqlPersistT m) ()
|
||||
sinkFiles = C.mapM sinkFile
|
||||
|
||||
sinkFile :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => File (SqlPersistT m) -> SqlPersistT m FileReference
|
||||
sinkFile File{ fileContent = Nothing, .. } = return FileReference
|
||||
{ fileReferenceContent = Nothing
|
||||
, fileReferenceTitle = fileTitle
|
||||
, fileReferenceModified = fileModified
|
||||
}
|
||||
sinkFile File{ fileContent = Just fileContentContent, .. } = do
|
||||
(unsealConduitT -> fileContentContent', isEmpty) <- fileContentContent $$+ is _Nothing <$> C.peekE
|
||||
|
||||
fileContentHash <- if
|
||||
| not isEmpty -> maybeT (sinkFileDB False fileContentContent') $ do
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
chunk <- liftIO newEmptyMVar
|
||||
let putChunks = do
|
||||
nextChunk <- await
|
||||
case nextChunk of
|
||||
Nothing
|
||||
-> putMVar chunk Nothing
|
||||
Just nextChunk'
|
||||
-> putMVar chunk (Just nextChunk') >> yield nextChunk' >> putChunks
|
||||
sinkAsync <- lift . allocateLinkedAsync . runConduit
|
||||
$ fileContentContent'
|
||||
.| putChunks
|
||||
.| Crypto.sinkHash
|
||||
|
||||
runAppMinio $ do
|
||||
tmpUUID <- liftIO UUID.nextRandom
|
||||
let uploadName = ".tmp." <> toPathPiece tmpUUID
|
||||
pooOptions = Minio.defaultPutObjectOptions
|
||||
{ Minio.pooCacheControl = Just "immutable"
|
||||
}
|
||||
Minio.putObject uploadBucket uploadName (C.unfoldM (\x -> fmap (, x) <$> takeMVar chunk) ()) Nothing pooOptions
|
||||
fileContentHash <- review _Wrapped <$> waitAsync sinkAsync
|
||||
let dstName = minioFileReference # fileContentHash
|
||||
copySrc = Minio.defaultSourceInfo
|
||||
{ Minio.srcBucket = uploadBucket, Minio.srcObject = uploadName
|
||||
}
|
||||
copyDst = Minio.defaultDestinationInfo
|
||||
{ Minio.dstBucket = uploadBucket
|
||||
, Minio.dstObject = dstName
|
||||
}
|
||||
uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket dstName Minio.defaultGetObjectOptions
|
||||
unless uploadExists $
|
||||
Minio.copyObject copyDst copySrc
|
||||
Minio.removeObject uploadBucket uploadName
|
||||
return fileContentHash
|
||||
| otherwise -> return $$(liftTyped $ FileContentReference $$(emptyHash))
|
||||
|
||||
return FileReference
|
||||
{ fileReferenceContent = Just fileContentHash
|
||||
, fileReferenceTitle = fileTitle
|
||||
, fileReferenceModified = fileModified
|
||||
}
|
||||
where
|
||||
fileContentHash = Crypto.hash fileContentContent
|
||||
|
||||
|
||||
sinkFiles' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => ConduitT (File, FileReferenceResidual record) record (SqlPersistT m) ()
|
||||
sinkFiles' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => ConduitT (File (SqlPersistT m), FileReferenceResidual record) record (SqlPersistT m) ()
|
||||
sinkFiles' = C.mapM $ uncurry sinkFile'
|
||||
|
||||
sinkFile' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => File -> FileReferenceResidual record -> SqlPersistT m record
|
||||
sinkFile' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => File (SqlPersistT m) -> FileReferenceResidual record -> SqlPersistT m record
|
||||
sinkFile' file residual = do
|
||||
reference <- sinkFile file
|
||||
return $ _FileReference # (reference, residual)
|
||||
|
||||
@ -1,22 +1,32 @@
|
||||
module Utils.Sql
|
||||
( setSerializable, setSerializable'
|
||||
, catchSql, handleSql
|
||||
, isUniqueConstraintViolation
|
||||
, catchIfSql, handleIfSql
|
||||
) where
|
||||
|
||||
import ClassyPrelude.Yesod
|
||||
import ClassyPrelude.Yesod hiding (handle)
|
||||
import Numeric.Natural
|
||||
import Settings.Log
|
||||
|
||||
import Database.PostgreSQL.Simple (SqlError)
|
||||
import Database.PostgreSQL.Simple (SqlError(..))
|
||||
import Database.PostgreSQL.Simple.Errors (isSerializationError)
|
||||
import Control.Monad.Catch (MonadMask)
|
||||
import Control.Monad.Catch
|
||||
|
||||
import Database.Persist.Sql
|
||||
import Database.Persist.Sql.Raw.QQ
|
||||
|
||||
import qualified Data.ByteString as ByteString
|
||||
|
||||
import Control.Retry
|
||||
|
||||
import Control.Lens ((&))
|
||||
|
||||
import qualified Data.UUID as UUID
|
||||
import Control.Monad.Random.Class (MonadRandom(getRandom))
|
||||
|
||||
import Text.Shakespeare.Text (st)
|
||||
|
||||
|
||||
setSerializable :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a
|
||||
setSerializable = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6
|
||||
@ -54,5 +64,29 @@ setSerializable' policy act = do
|
||||
transactionSaveWithIsolation ReadCommitted
|
||||
return res
|
||||
|
||||
catchSql :: forall m a. (MonadCatch m, MonadIO m) => SqlPersistT m a -> (SqlError -> SqlPersistT m a) -> SqlPersistT m a
|
||||
catchSql = flip handleSql
|
||||
|
||||
handleSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> SqlPersistT m a) -> SqlPersistT m a -> SqlPersistT m a
|
||||
handleSql recover act = do
|
||||
savepointName <- liftIO $ UUID.toString <$> getRandom
|
||||
|
||||
let recover' :: SqlError -> SqlPersistT m a
|
||||
recover' exc = do
|
||||
rawExecute [st|ROLLBACK TO SAVEPOINT "#{savepointName}"|] []
|
||||
recover exc
|
||||
|
||||
handle recover' $ do
|
||||
rawExecute [st|SAVEPOINT "#{savepointName}"|] []
|
||||
res <- act
|
||||
rawExecute [st|RELEASE SAVEPOINT "#{savepointName}"|] []
|
||||
return res
|
||||
|
||||
catchIfSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> Bool) -> SqlPersistT m a -> (SqlError -> SqlPersistT m a) -> SqlPersistT m a
|
||||
catchIfSql p = flip $ handleIfSql p
|
||||
|
||||
handleIfSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> Bool) -> (SqlError -> SqlPersistT m a) -> SqlPersistT m a -> SqlPersistT m a
|
||||
handleIfSql p recover = handleSql (\err -> bool throwM recover (p err) err)
|
||||
|
||||
isUniqueConstraintViolation :: SqlError -> Bool
|
||||
isUniqueConstraintViolation SqlError{..} = "duplicate key value violates unique constraint" `ByteString.isPrefixOf` sqlErrorMsg
|
||||
|
||||
10
stack.yaml
10
stack.yaml
@ -47,6 +47,15 @@ extra-deps:
|
||||
- filepath-crypto
|
||||
- uuid-crypto
|
||||
|
||||
- git: https://github.com/gkleen/FastCDC.git
|
||||
commit: 7326e2931454282df9081105dad812845db5c530
|
||||
subdirs:
|
||||
- gearhash
|
||||
- fastcdc
|
||||
|
||||
- git: git@gitlab2.rz.ifi.lmu.de:uni2work/zip-stream.git
|
||||
commit: 843683d024f767de236f74d24a3348f69181a720
|
||||
|
||||
- generic-lens-1.2.0.0@sha256:b19e7970c93743a46bc3702331512a96d163de4356472f2d51a2945887aefe8c,6524 # manual downgrade; won't compile with >=2.0.0.0
|
||||
|
||||
- acid-state-0.16.0.1@sha256:d43f6ee0b23338758156c500290c4405d769abefeb98e9bc112780dae09ece6f,6207
|
||||
@ -65,6 +74,7 @@ extra-deps:
|
||||
- tz-0.1.3.4@sha256:bd311e202b8bdd15bcd6a4ca182e69794949d3b3b9f4aa835e9ccff011284979,5086
|
||||
- unidecode-0.1.0.4@sha256:99581ee1ea334a4596a09ae3642e007808457c66893b587e965b31f15cbf8c4d,1144
|
||||
- wai-middleware-prometheus-1.0.0@sha256:1625792914fb2139f005685be8ce519111451cfb854816e430fbf54af46238b4,1314
|
||||
- primitive-0.7.1.0@sha256:6a237bb338bcc43193077ff8e8c0f0ce2de14c652231496a15672e8b563a07e2,2604
|
||||
|
||||
resolver: nightly-2020-08-08
|
||||
compiler: ghc-8.10.2
|
||||
|
||||
@ -5,9 +5,6 @@
|
||||
|
||||
packages:
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 4229
|
||||
sha256: 0dcfe3c4a67be4e96e1ae2e3c4b8744bc11e094853005a32f6074ab776caa3a9
|
||||
name: encoding
|
||||
version: 0.8.2
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/encoding.git
|
||||
@ -19,9 +16,6 @@ packages:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/encoding.git
|
||||
commit: 22fc3bb14841d8d50997aa47f1be3852e666f787
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 2399
|
||||
sha256: 20cdf97602abb8fd7356c1a64c69fa857e34ab4cfe7834460d2ad783f7e4e4e3
|
||||
name: memcached-binary
|
||||
version: 0.2.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/memcached-binary.git
|
||||
@ -33,9 +27,6 @@ packages:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/memcached-binary.git
|
||||
commit: b7071df50bad3a251a544b984e4bf98fa09b8fae
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 1423
|
||||
sha256: 49818ee0de2d55cbfbc15ca4de1761c3adac6ba3dfcdda960b413cad4f4fa47f
|
||||
name: conduit-resumablesink
|
||||
version: '0.3'
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/conduit-resumablesink.git
|
||||
@ -47,9 +38,6 @@ packages:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/conduit-resumablesink.git
|
||||
commit: cbea6159c2975d42f948525e03e12fc390da53c5
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 2069
|
||||
sha256: 9192ac19ea5da3cd4b8c86a4266592aff7b9256311aa5f42ae6de94ccacf1366
|
||||
name: HaskellNet
|
||||
version: 0.5.1
|
||||
git: git://github.com/jtdaugherty/HaskellNet.git
|
||||
@ -61,9 +49,6 @@ packages:
|
||||
git: git://github.com/jtdaugherty/HaskellNet.git
|
||||
commit: 5aa1f3b009253b02c4822005ac59ee208a10a347
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 1934
|
||||
sha256: 9fbe7c3681e963eea213ab38be17966bb690788c1c55a67257916b677d7d2ec2
|
||||
name: HaskellNet-SSL
|
||||
version: 0.3.4.1
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/HaskellNet-SSL.git
|
||||
@ -75,9 +60,6 @@ packages:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/HaskellNet-SSL.git
|
||||
commit: 40393c938111ac78232dc2c7eec5edb4a22d03e8
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 2208
|
||||
sha256: 48f6e03d8f812bd24e2601497ffe9c8a78907fa2266ba05abeefdfe99221617d
|
||||
name: ldap-client
|
||||
version: 0.4.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/ldap-client.git
|
||||
@ -90,9 +72,6 @@ packages:
|
||||
commit: 01afaf599ba6f8a9d804c269e91d3190b249d3f0
|
||||
- completed:
|
||||
subdir: serversession
|
||||
cabal-file:
|
||||
size: 2081
|
||||
sha256: a958ff0007e5084e3e4c2a33acc9860c31186105f02f8ab99ecb847a7a8f9497
|
||||
name: serversession
|
||||
version: 1.0.1
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/serversession.git
|
||||
@ -106,9 +85,6 @@ packages:
|
||||
commit: 1c95b0100471279413485411032d639881012a5e
|
||||
- completed:
|
||||
subdir: serversession-backend-acid-state
|
||||
cabal-file:
|
||||
size: 1875
|
||||
sha256: 6cc9d29e788334670bc102213a8aae73bc1b8b0a00c416f06d232376750443b7
|
||||
name: serversession-backend-acid-state
|
||||
version: 1.0.3
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/serversession.git
|
||||
@ -121,9 +97,6 @@ packages:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/serversession.git
|
||||
commit: 1c95b0100471279413485411032d639881012a5e
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 1966
|
||||
sha256: 92d6d7be95a1ee6cb9783deb35e0d4e4959c7de20d518a45370084b57e20ba51
|
||||
name: xss-sanitize
|
||||
version: 0.3.6
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/xss-sanitize.git
|
||||
@ -136,9 +109,6 @@ packages:
|
||||
commit: 074ed7c8810aca81f60f2c535f9e7bad67e9d95a
|
||||
- completed:
|
||||
subdir: colonnade
|
||||
cabal-file:
|
||||
size: 2020
|
||||
sha256: 28f603d097aee65ddf8fe032e7e0f87523a58c516253cba196922027c8fd54d5
|
||||
name: colonnade
|
||||
version: 1.2.0.2
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/colonnade.git
|
||||
@ -151,9 +121,6 @@ packages:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/colonnade.git
|
||||
commit: f8170266ab25b533576e96715bedffc5aa4f19fa
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 9845
|
||||
sha256: 674630347209bc5f7984e8e9d93293510489921f2d2d6092ad1c9b8c61b6560a
|
||||
name: minio-hs
|
||||
version: 1.5.2
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/minio-hs.git
|
||||
@ -166,9 +133,6 @@ packages:
|
||||
commit: 42103ab247057c04c8ce7a83d9d4c160713a3df1
|
||||
- completed:
|
||||
subdir: cryptoids-class
|
||||
cabal-file:
|
||||
size: 1155
|
||||
sha256: 1fa96858ded816798f8e1c77d7945185c0d7ceb2536185d39fc72496da8a0125
|
||||
name: cryptoids-class
|
||||
version: 0.0.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git
|
||||
@ -182,9 +146,6 @@ packages:
|
||||
commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f
|
||||
- completed:
|
||||
subdir: cryptoids-types
|
||||
cabal-file:
|
||||
size: 1214
|
||||
sha256: ee8966212554a156f2de236d4f005ff3a9d3098778ff6cc3f114ccaa0aff8825
|
||||
name: cryptoids-types
|
||||
version: 1.0.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git
|
||||
@ -198,9 +159,6 @@ packages:
|
||||
commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f
|
||||
- completed:
|
||||
subdir: cryptoids
|
||||
cabal-file:
|
||||
size: 1505
|
||||
sha256: fcf07cd0dca21db976c25cbdf4dcc5c747cebcb7bf14c05804c8ae14223f6046
|
||||
name: cryptoids
|
||||
version: 0.5.1.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git
|
||||
@ -214,9 +172,6 @@ packages:
|
||||
commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f
|
||||
- completed:
|
||||
subdir: filepath-crypto
|
||||
cabal-file:
|
||||
size: 1716
|
||||
sha256: 218da063bb7b00e3728deebf830904174b2b78bc29b3f203e6824b8caac92788
|
||||
name: filepath-crypto
|
||||
version: 0.1.0.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git
|
||||
@ -230,9 +185,6 @@ packages:
|
||||
commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f
|
||||
- completed:
|
||||
subdir: uuid-crypto
|
||||
cabal-file:
|
||||
size: 1460
|
||||
sha256: 1db54db1b85303e50cec3c99ddb8de6c9bedc388fa9ce5a1fce61520023b9ee5
|
||||
name: uuid-crypto
|
||||
version: 1.4.0.0
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git
|
||||
@ -244,6 +196,43 @@ packages:
|
||||
subdir: uuid-crypto
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git
|
||||
commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f
|
||||
- completed:
|
||||
subdir: gearhash
|
||||
name: gearhash
|
||||
version: 0.0.0
|
||||
git: https://github.com/gkleen/FastCDC.git
|
||||
pantry-tree:
|
||||
size: 504
|
||||
sha256: 61a08cdd003dc8a418f410dacbcc3a91adea4c23864f8ddbaba3b762e4c2924d
|
||||
commit: 7326e2931454282df9081105dad812845db5c530
|
||||
original:
|
||||
subdir: gearhash
|
||||
git: https://github.com/gkleen/FastCDC.git
|
||||
commit: 7326e2931454282df9081105dad812845db5c530
|
||||
- completed:
|
||||
subdir: fastcdc
|
||||
name: fastcdc
|
||||
version: 0.0.0
|
||||
git: https://github.com/gkleen/FastCDC.git
|
||||
pantry-tree:
|
||||
size: 244
|
||||
sha256: 80ae3d79344f7c6a73a8d7adf07ff2b7f0736fd34760b4b8a15e26f32d9773f6
|
||||
commit: 7326e2931454282df9081105dad812845db5c530
|
||||
original:
|
||||
subdir: fastcdc
|
||||
git: https://github.com/gkleen/FastCDC.git
|
||||
commit: 7326e2931454282df9081105dad812845db5c530
|
||||
- completed:
|
||||
name: zip-stream
|
||||
version: 0.2.0.1
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/zip-stream.git
|
||||
pantry-tree:
|
||||
size: 812
|
||||
sha256: 0da8bc38d73034962d2e2d1a7586b6dee848a629319fce9cbbf578348c61118c
|
||||
commit: 843683d024f767de236f74d24a3348f69181a720
|
||||
original:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/zip-stream.git
|
||||
commit: 843683d024f767de236f74d24a3348f69181a720
|
||||
- completed:
|
||||
hackage: generic-lens-1.2.0.0@sha256:b19e7970c93743a46bc3702331512a96d163de4356472f2d51a2945887aefe8c,6524
|
||||
pantry-tree:
|
||||
@ -363,6 +352,13 @@ packages:
|
||||
sha256: 6d64803c639ed4c7204ea6fab0536b97d3ee16cdecb9b4a883cd8e56d3c61402
|
||||
original:
|
||||
hackage: wai-middleware-prometheus-1.0.0@sha256:1625792914fb2139f005685be8ce519111451cfb854816e430fbf54af46238b4,1314
|
||||
- completed:
|
||||
hackage: primitive-0.7.1.0@sha256:6a237bb338bcc43193077ff8e8c0f0ce2de14c652231496a15672e8b563a07e2,2604
|
||||
pantry-tree:
|
||||
size: 1376
|
||||
sha256: 924e88629b493abb6b2f3c3029cef076554a2b627091e3bb6887ec03487a707d
|
||||
original:
|
||||
hackage: primitive-0.7.1.0@sha256:6a237bb338bcc43193077ff8e8c0f0ce2de14c652231496a15672e8b563a07e2,2604
|
||||
snapshots:
|
||||
- completed:
|
||||
size: 524392
|
||||
|
||||
@ -5,7 +5,6 @@ module Database.Fill
|
||||
import "uniworx" Import hiding (Option(..), currentYear)
|
||||
import Handler.Utils.Form (SheetGrading'(..), SheetType'(..), SheetGroup'(..))
|
||||
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.ByteString.Lazy as LBS
|
||||
import qualified Data.Text as Text
|
||||
-- import Data.Text.IO (hPutStrLn)
|
||||
@ -30,6 +29,8 @@ import qualified Data.Csv as Csv
|
||||
import Crypto.Random (getRandomBytes)
|
||||
import Data.List (genericLength)
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
|
||||
testdataDir :: FilePath
|
||||
testdataDir = "testdata"
|
||||
@ -37,7 +38,7 @@ testdataDir = "testdata"
|
||||
|
||||
insertFile :: ( HasFileReference fRef, PersistRecordBackend fRef SqlBackend ) => FileReferenceResidual fRef -> FilePath -> DB (Key fRef)
|
||||
insertFile residual fileTitle = do
|
||||
fileContent <- liftIO . fmap Just . BS.readFile $ testdataDir </> fileTitle
|
||||
let fileContent = Just . C.sourceFile $ testdataDir </> fileTitle
|
||||
fileModified <- liftIO getCurrentTime
|
||||
sinkFile' File{..} residual >>= insert
|
||||
|
||||
|
||||
@ -16,8 +16,6 @@ import qualified Data.Conduit.Combinators as C
|
||||
import Control.Lens.Extras
|
||||
import Control.Monad.Trans.Maybe
|
||||
|
||||
import qualified Crypto.Hash as Crypto (hash)
|
||||
|
||||
import System.FilePath (dropDrive)
|
||||
|
||||
import Data.Time.Clock (diffUTCTime)
|
||||
@ -25,6 +23,8 @@ import Data.Char (chr)
|
||||
|
||||
import Database.Persist.Sql (transactionUndo)
|
||||
|
||||
import Data.Bitraversable
|
||||
|
||||
|
||||
instance Arbitrary (FileReferenceResidual PersonalisedSheetFile) where
|
||||
arbitrary = PersonalisedSheetFileResidual
|
||||
@ -59,7 +59,7 @@ spec = withApp . describe "Personalised sheet file zip encoding" $ do
|
||||
lift (insertUnique user) >>= maybe userLoop return
|
||||
in userLoop
|
||||
let res = res' { personalisedSheetFileResidualSheet = shid, personalisedSheetFileResidualUser = uid }
|
||||
fRef <- lift (sinkFile f :: DB FileReference)
|
||||
fRef <- lift (sinkFile (transFile generalize f) :: DB FileReference)
|
||||
now <- liftIO getCurrentTime
|
||||
void . lift . insert $ CourseParticipant cid (personalisedSheetFileResidualUser res) now Nothing CourseParticipantActive
|
||||
void . lift . insert $ _FileReference # (fRef, res)
|
||||
@ -68,30 +68,38 @@ spec = withApp . describe "Personalised sheet file zip encoding" $ do
|
||||
anonMode <- liftIO $ generate arbitrary
|
||||
|
||||
let
|
||||
fpL :: Lens' (Either PersonalisedSheetFile File) FilePath
|
||||
fpL :: forall m. Lens' (Either PersonalisedSheetFile (File m)) FilePath
|
||||
fpL = lens (either personalisedSheetFileTitle fileTitle) $ \f' path -> case f' of
|
||||
Left pf -> Left pf { personalisedSheetFileTitle = path }
|
||||
Right f -> Right f { fileTitle = path }
|
||||
isDirectory = either (is _Nothing . personalisedSheetFileContent) (is _Nothing . fileContent)
|
||||
|
||||
loadFile :: Either (PersonalisedSheetFileUnresolved (Either PersonalisedSheetFile DBFile)) (Either PersonalisedSheetFile DBFile, FileReferenceResidual PersonalisedSheetFile)
|
||||
-> DB (Either (PersonalisedSheetFileUnresolved (Either PersonalisedSheetFile PureFile)) (Either PersonalisedSheetFile PureFile, FileReferenceResidual PersonalisedSheetFile))
|
||||
loadFile = bitraverse loadUnresolved loadResolved
|
||||
where
|
||||
loadUnresolved = traverse $ traverse toPureFile
|
||||
loadResolved (f, fRes) = (, fRes) <$> traverse toPureFile f
|
||||
|
||||
recoveredFiles <- runConduit $
|
||||
sourcePersonalisedSheetFiles cid (Just shid) Nothing anonMode
|
||||
.| resolvePersonalisedSheetFiles fpL isDirectory cid shid
|
||||
.| C.mapM loadFile
|
||||
.| C.foldMap pure
|
||||
|
||||
let
|
||||
checkFile :: Either (PersonalisedSheetFileUnresolved (Either PersonalisedSheetFile File)) (Either PersonalisedSheetFile File, FileReferenceResidual PersonalisedSheetFile)
|
||||
-> (File, FileReferenceResidual PersonalisedSheetFile)
|
||||
checkFile :: Either (PersonalisedSheetFileUnresolved (Either PersonalisedSheetFile PureFile)) (Either PersonalisedSheetFile PureFile, FileReferenceResidual PersonalisedSheetFile)
|
||||
-> (PureFile, FileReferenceResidual PersonalisedSheetFile)
|
||||
-> Bool
|
||||
checkFile (Left _) _
|
||||
= False
|
||||
checkFile (Right (recFile, recResidual)) (file, residual)
|
||||
= recResidual == residual
|
||||
&& case recFile of
|
||||
Right f -> file == f
|
||||
Left pf -> dropDrive (fileTitle file) == dropDrive (personalisedSheetFileTitle pf)
|
||||
&& abs (fileModified file `diffUTCTime` personalisedSheetFileModified pf) < 1e-6 -- Precision is a PostgreSQL limitation
|
||||
&& fmap Crypto.hash (fileContent file) == personalisedSheetFileContent pf
|
||||
Right f -> f == file
|
||||
Left pf -> dropDrive (fileTitle file) == dropDrive (personalisedSheetFileTitle pf)
|
||||
&& abs (fileModified file `diffUTCTime` personalisedSheetFileModified pf) < 1e-6 -- Precision is a PostgreSQL limitation
|
||||
&& fileReferenceContent (pureFileToFileReference file) == personalisedSheetFileContent pf
|
||||
|
||||
errors = go [] sheetFiles recoveredFiles
|
||||
where go acc xs [] = reverse acc ++ map Left xs
|
||||
|
||||
@ -15,6 +15,8 @@ import Text.Shakespeare.I18N (renderMessage)
|
||||
|
||||
import Utils.Lens (_ratingValues, _ratingPoints)
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
|
||||
spec :: Spec
|
||||
spec = describe "Rating file parsing/pretty-printing" $ do
|
||||
@ -33,7 +35,7 @@ spec = describe "Rating file parsing/pretty-printing" $ do
|
||||
mr' = MsgRenderer $ renderMessage (error "foundation inspected" :: site) []
|
||||
|
||||
parseRating' :: LBS.ByteString -> Maybe Rating'
|
||||
parseRating' = either (\(_ :: SomeException) -> Nothing) (Just . fst) . parseRating . flip (File "bewertung.txt") time . Just . LBS.toStrict
|
||||
parseRating' = either (\(_ :: SomeException) -> Nothing) (Just . fst) . parseRating . flip (File "bewertung.txt") time . Just . C.sourceLazy
|
||||
|
||||
time = UTCTime systemEpochDay 0
|
||||
mRating rating = rating { ratingValues = mRating' rating $ ratingValues rating }
|
||||
|
||||
@ -2,10 +2,11 @@ module Handler.Utils.ZipSpec where
|
||||
|
||||
import TestImport
|
||||
|
||||
import Utils ((<//>))
|
||||
import Handler.Utils.Zip
|
||||
|
||||
import Data.Conduit
|
||||
import qualified Data.Conduit.List as Conduit
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
import Data.List (dropWhileEnd)
|
||||
|
||||
@ -14,16 +15,59 @@ import ModelSpec ()
|
||||
import System.FilePath
|
||||
import Data.Time
|
||||
|
||||
import Data.Universe
|
||||
|
||||
|
||||
data ZipConsumptionStrategy
|
||||
= ZipConsumeInterleaved
|
||||
| ZipConsumeBuffered
|
||||
deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic, Typeable)
|
||||
deriving anyclass (Universe, Finite)
|
||||
|
||||
data ZipProductionStrategy
|
||||
= ZipProduceInterleaved
|
||||
| ZipProduceBuffered
|
||||
deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic, Typeable)
|
||||
deriving anyclass (Universe, Finite)
|
||||
|
||||
|
||||
spec :: Spec
|
||||
spec = describe "Zip file handling" $ do
|
||||
it "has compatible encoding/decoding to/from zip files" . property $ do
|
||||
zipFiles <- listOf $ scale (`div` 2) arbitrary
|
||||
return . property $ do
|
||||
zipFiles' <- runConduit $ Conduit.sourceList zipFiles .| produceZip def .| void consumeZip .| Conduit.consume
|
||||
forM_ (zipFiles `zip` zipFiles') $ \(file, file') -> do
|
||||
let acceptableFilenameChanges
|
||||
= makeValid . dropWhile isPathSeparator . bool (dropWhileEnd isPathSeparator) addTrailingPathSeparator (isNothing $ fileContent file) . normalise . makeValid
|
||||
describe "has compatible encoding to, and decoding from zip files" . forM_ universeF $ \strategy ->
|
||||
modifyMaxSuccess (bool id (37 *) $ strategy == (ZipProduceInterleaved, ZipConsumeInterleaved)) . it (show strategy) . property $ do
|
||||
zipFiles <- listOf arbitrary :: Gen [PureFile]
|
||||
return . property $ do
|
||||
|
||||
let zipProduceBuffered
|
||||
= evaluate . force <=< runConduitRes $ zipProduceInterleaved .| C.sinkLazy
|
||||
zipProduceInterleaved
|
||||
= C.yieldMany zipFiles .| C.map fromPureFile .| produceZip def
|
||||
zipConsumeBuffered zipProd
|
||||
= mapM toPureFile <=< runConduitRes $ void (consumeZip zipProd) .| C.foldMap pure
|
||||
zipConsumeInterleaved zipProd
|
||||
= void (consumeZip zipProd) .| C.mapM toPureFile .| C.foldMap pure
|
||||
zipFiles' <- case strategy of
|
||||
(ZipProduceBuffered, ZipConsumeInterleaved) ->
|
||||
runConduitRes . zipConsumeInterleaved . C.sourceLazy =<< zipProduceBuffered
|
||||
(ZipProduceBuffered, ZipConsumeBuffered) ->
|
||||
zipConsumeBuffered . C.sourceLazy =<< zipProduceBuffered
|
||||
(ZipProduceInterleaved, ZipConsumeInterleaved) ->
|
||||
runConduitRes $ zipConsumeInterleaved zipProduceInterleaved
|
||||
(ZipProduceInterleaved, ZipConsumeBuffered) ->
|
||||
zipConsumeBuffered zipProduceInterleaved
|
||||
|
||||
let acceptableFilenameChanges file
|
||||
= "." <//> fileTitle file
|
||||
& normalise
|
||||
& makeValid
|
||||
& dropWhile isPathSeparator
|
||||
& dropWhileEnd isPathSeparator
|
||||
& bool id addTrailingPathSeparator (isNothing $ fileContent file)
|
||||
& normalise
|
||||
& makeValid
|
||||
acceptableTimeDifference t1 t2 = abs (diffUTCTime t1 t2) <= 2
|
||||
(shouldBe `on` acceptableFilenameChanges) (fileTitle file') (fileTitle file)
|
||||
(fileModified file', fileModified file) `shouldSatisfy` uncurry acceptableTimeDifference
|
||||
(fileContent file') `shouldBe` (fileContent file)
|
||||
|
||||
forM_ (zipFiles `zip` zipFiles') $ \(file, file') -> do
|
||||
(shouldBe `on` acceptableFilenameChanges) file' file
|
||||
(fileModified file', fileModified file) `shouldSatisfy` uncurry acceptableTimeDifference
|
||||
(view _pureFileContent file' :: Maybe ByteString) `shouldBe` (view _pureFileContent file)
|
||||
|
||||
@ -34,6 +34,10 @@ import Control.Monad.Catch.Pure (Catch, runCatch)
|
||||
|
||||
import System.IO.Unsafe (unsafePerformIO)
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
import Data.Ratio ((%))
|
||||
|
||||
|
||||
instance Arbitrary EmailAddress where
|
||||
arbitrary = do
|
||||
@ -137,12 +141,18 @@ instance Arbitrary User where
|
||||
return User{..}
|
||||
shrink = genericShrink
|
||||
|
||||
instance Arbitrary File where
|
||||
scaleRatio :: Rational -> Int -> Int
|
||||
scaleRatio r = ceiling . (* r) . fromIntegral
|
||||
|
||||
instance Monad m => Arbitrary (File m) where
|
||||
arbitrary = do
|
||||
fileTitle <- scale (`div` 2) $ (joinPath <$> arbitrary) `suchThat` (any $ not . isPathSeparator)
|
||||
fileTitle <- scale (scaleRatio $ 1 % 8) $ (joinPath <$> arbitrary) `suchThat` (any $ not . isPathSeparator)
|
||||
date <- addDays <$> arbitrary <*> pure (fromGregorian 2043 7 2)
|
||||
fileModified <- (addUTCTime <$> arbitrary <*> pure (UTCTime date 0)) `suchThat` inZipRange
|
||||
fileContent <- arbitrary
|
||||
fileContent <- oneof
|
||||
[ pure Nothing
|
||||
, Just . C.sourceLazy <$> scale (scaleRatio $ 7 % 8) arbitrary
|
||||
]
|
||||
return File{..}
|
||||
where
|
||||
inZipRange :: UTCTime -> Bool
|
||||
@ -152,7 +162,6 @@ instance Arbitrary File where
|
||||
= True
|
||||
| otherwise
|
||||
= False
|
||||
shrink = genericShrink
|
||||
|
||||
instance Arbitrary School where
|
||||
arbitrary = do
|
||||
@ -177,8 +186,8 @@ spec = do
|
||||
parallel $ do
|
||||
lawsCheckHspec (Proxy @User)
|
||||
[ eqLaws, jsonLaws ]
|
||||
lawsCheckHspec (Proxy @File)
|
||||
[ eqLaws ]
|
||||
lawsCheckHspec (Proxy @PureFile)
|
||||
[ eqLaws, ordLaws ]
|
||||
lawsCheckHspec (Proxy @School)
|
||||
[ eqLaws ]
|
||||
lawsCheckHspec (Proxy @Term)
|
||||
|
||||
@ -22,6 +22,7 @@ import Yesod.Auth as X
|
||||
import Yesod.Test as X
|
||||
import Yesod.Core.Unsafe (fakeHandlerGetLogger)
|
||||
import Test.QuickCheck as X
|
||||
import Test.Hspec.QuickCheck as X hiding (prop)
|
||||
import Test.QuickCheck.Gen as X
|
||||
import Data.Default as X
|
||||
import Test.QuickCheck.Instances as X ()
|
||||
@ -65,6 +66,8 @@ import Handler.Utils (runAppLoggingT)
|
||||
import Web.PathPieces (toPathPiece)
|
||||
import Utils.Parameters (GlobalPostParam(PostLoginDummy))
|
||||
|
||||
import Control.Monad.Morph as X (generalize)
|
||||
|
||||
|
||||
runDB :: SqlPersistM a -> YesodExample UniWorX a
|
||||
runDB query = do
|
||||
|
||||
Loading…
Reference in New Issue
Block a user