multi color filter tail

root / TailHandle.hs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
{-# LANGUAGE CPP, DeriveDataTypeable, OverloadedStrings, ForeignFunctionInterface #-}
module TailHandle
  ( runTail
  ) where

import Control.Concurrent (ThreadId, myThreadId, threadWaitRead, yield, killThread)
import Control.Concurrent.MVar (newEmptyMVar, readMVar)
import Control.Exception (Exception(..), asyncExceptionToException, asyncExceptionFromException, catch, throwTo, mask)
import Control.Monad ((>=>), guard, when, unless, forever)
import qualified Data.ByteString.Char8 as BS
import Data.Functor (($>))
import qualified Data.HashSet as Set
import Data.List (genericTake, genericLength)
import Data.Maybe (isJust)
import Data.Monoid ((<>))
import Data.Typeable (Typeable)
import Foreign.C.Types (CInt(..))
import Foreign.C.Error (throwErrnoIfNull)
import Foreign.Marshal.Alloc (allocaBytes)
import Foreign.Ptr (Ptr, castPtr)
import System.FilePath ((</>))
#ifdef INOTIFY
import qualified System.INotify as INotify
#endif
import System.IO.Error (isDoesNotExistError, isEOFError, isFullError)
import System.IO (SeekMode(AbsoluteSeek))
import System.Posix.Directory (DirStream, readDirStream, rewindDirStream)
import System.Posix.Files (getFileStatus, getFdStatus, isRegularFile, isNamedPipe, isSocket, isCharacterDevice, isDirectory, fileID, fileSize)
import System.Posix.IO (openFd, OpenMode(ReadOnly), OpenFileFlags(..), fdReadBuf, fdSeek, setFdOption, FdOption(NonBlockingRead), closeFd)
import System.Posix.Types (Fd(..), FileOffset, FileID)
import Unsafe.Coerce (unsafeCoerce)

import Util
import TailTypes
import Tail

data TailHandle = TailHandle
  { thTail :: Tail
  , thRuntime :: TailRuntime
  , thPoll :: ThreadId
  , thReopen :: Maybe ThreadId
  , thFd :: Maybe Fd
  , thPos :: FileOffset -- or -1 for blocking fd
  , thIno :: Maybe FileID
  , thBuf :: BS.ByteString
  , thDirStream :: Maybe DirStream
  , thDirList :: Set.HashSet FilePath
  , thAgain :: Bool
#ifdef INOTIFY
  , thPollWatch :: Maybe INotify.WatchDescriptor
  , thReopenWatch :: Maybe INotify.WatchDescriptor
#endif
  }

data TailSignal 
  = SignalPoll 
  | SignalReopen 
  | SignalInsert String Bool
  | SignalDelete String
  deriving (Show, Typeable, Eq, Ord)
instance Exception TailSignal where
  toException = asyncExceptionToException
  fromException = asyncExceptionFromException

thErrMsg :: TailHandle -> BS.ByteString -> IO ()
thErrMsg t = tailErrMsg (thRuntime t) (thTail t)

catchDoesNotExist :: IO a -> IO (Maybe a)
catchDoesNotExist f = catchWhen isDoesNotExistError (Just <$> f) (return Nothing)

bad :: String -> IO a
bad = ioError . userError

closeTail :: TailHandle -> IO TailHandle
closeTail th@TailHandle{ thFd = Nothing } = return th
closeTail th@TailHandle{ thFd = Just fd } = do
#ifdef INOTIFY
  mapM_ INotify.removeWatch (thPollWatch th)
  mapM_ INotify.removeWatch (thReopenWatch th)
#endif
  closeFd fd
  return th
    { thFd = Nothing
    , thPos = 0
    , thIno = Nothing
#ifdef INOTIFY
    , thPollWatch = Nothing
    , thReopenWatch = Nothing 
#endif
    }

seekTail :: FileOffset -> TailHandle -> IO TailHandle
seekTail _ TailHandle{ thFd = Nothing } = bad "seek on closed fd"
seekTail c th@TailHandle{ thFd = Just fd } =
  fdSeek fd AbsoluteSeek c $> th{ thPos = c }

inotifyTail :: TailHandle -> IO TailHandle
#ifdef INOTIFY
inotifyTail th@TailHandle
    { thRuntime = TailRuntime{ trINotify = Just inotify }
    , thPoll = tid
    , thPos = pos
    , thTail = Tail
      { tailTarg = TailPath path
      , tailPollINotify = ipoll
      , tailReopenINotify = ireopen
    } } = do
  let sig (INotify.Modified {}) = Just SignalPoll
      sig (INotify.MovedSelf {}) = Just SignalReopen
      sig (INotify.MovedOut { INotify.filePath = f }) | notdot f = Just $ SignalDelete f
      sig (INotify.MovedIn { INotify.filePath = f }) | notdot f = Just $ SignalInsert f False
      sig (INotify.Created { INotify.filePath = f }) | notdot f = Just $ SignalInsert f True
      sig (INotify.Deleted { INotify.filePath = f }) | notdot f = Just $ SignalDelete f
      sig _ = Nothing
      notdot "" = False
      notdot ('.':_) = False
      notdot _ = True
      add l = INotify.addWatch inotify l path (mapM_ (throwTo tid) . sig)
  poll <- justWhen (ipoll && pos >= 0) $
    if isJust (thDirStream th)
      then add [INotify.OnlyDir, INotify.Move, INotify.Create, INotify.Delete]
      else add [INotify.Modify]
  reopen <- justWhen ireopen $ add [INotify.MoveSelf, INotify.DeleteSelf]
  return th
    { thPollWatch = poll
    , thReopenWatch = reopen
    }
#endif
inotifyTail th = return th

foreign import ccall unsafe fdopendir :: CInt -> IO (Ptr ())
fdOpenDirStream :: Fd -> IO DirStream
fdOpenDirStream (Fd d) = (unsafeCoerce :: Ptr () -> DirStream) <$> throwErrnoIfNull "fdOpenDirStream" (fdopendir d)

readDirStreamAll :: DirStream -> IO [FilePath]
readDirStreamAll d = readDirStream d >>= c where
  c [] = return []
  c ('.':_) = readDirStreamAll d
  c f = (f :) <$> readDirStreamAll d

subTail :: TailHandle -> Bool -> FilePath -> IO ()
subTail TailHandle{ thRuntime = tr, thTail = t@Tail{ tailTarg = TailPath p } } new f | tailDirTail t || tailDirList t =
  trAddTail tr t
    { tailTarg = TailPath (p </> f)
    , tailFileTail = tailDirTail t
    , tailDirList = tailDirList t && tailDirRecursive t
    , tailDirTail = tailDirTail t && tailDirRecursive t
    , tailBegin = tailBegin t || new
    }
subTail _ _ _ = nop

openTail :: TailHandle -> IO TailHandle
openTail th@TailHandle{ thFd = Nothing } = get (tailTarg (thTail th)) where
  get (TailFd fd) = got (Just fd)
  get (TailPath path) = got =<<
    catchDoesNotExist (
      openFd path ReadOnly Nothing OpenFileFlags{
	append = False, exclusive = False, noctty = False, nonBlock = True, trunc = False
      })
  got Nothing = thErrMsg th "No such file or directory" $> th{ thPos = 0 } 
  got (Just fd) = do
    setFdOption fd NonBlockingRead True -- is this really necessary?
    inotifyTail =<< go fd =<< getFdStatus fd
  go fd stat
    | isRegularFile stat =
      seekTail (if pos < 0 then max 0 $ sz + 1 + pos else min sz pos) th'
    | isNamedPipe stat || isSocket stat || isCharacterDevice stat =
      return th'{ thPos = -1 }
    | isDirectory stat && (tailDirList (thTail th) || tailDirTail (thTail th)) = do
      ds <- fdOpenDirStream fd
      dl <- readDirStreamAll ds
      let nl = genericTake (if pos < 0 then genericLength dl + 1 + pos else pos) dl
          th'' = th'{ thDirStream = Just ds, thDirList = Set.fromList nl }
      mapM_ (subTail th'' False) nl
      return th''
    | otherwise =
      closeFd fd >> thErrMsg th "Unsupported file type" $> th
    where
      th' = th{ thFd = Just fd, thIno = Just (fileID stat), thAgain = True, thPos = 0 }
      sz = fileSize stat
      pos = thPos th
openTail _ = bad "open on opened tail"

reopenTail :: TailHandle -> IO TailHandle
reopenTail th@TailHandle{ thTail = Tail{ tailTarg = TailPath path }, thIno = ino } = do
  fstat <- catchDoesNotExist $ getFileStatus path
  case fstat of
    Nothing -> return th
    Just _ | ino == Nothing -> openTail th
    Just stat | ino == Just (fileID stat) -> return th
    Just stat | fileSize stat == 0 -> return th
    _ -> do
      thErrMsg th "Following new file"
      closeTail th >>= openTail
reopenTail th = return th

noRead :: TailHandle -> (TailHandle, [BS.ByteString])
noRead th = (th{ thAgain = False }, [])

bufsiz :: Int
bufsiz = 8192

insertMsg, deleteMsg :: FilePath -> BS.ByteString
insertMsg = BS.cons '+' . BS.pack
deleteMsg = BS.cons '-' . BS.pack

readTail :: TailHandle -> IO (TailHandle, [BS.ByteString])
readTail th@TailHandle{ thFd = Nothing } = return (noRead th)
readTail th@TailHandle{ thDirStream = Just ds } = do
  dl <- rewindDirStream ds >> readDirStreamAll ds
  let df f (n, o, s) 
        | Set.member f o = (n, Set.delete f o, s)
        | otherwise = (f : n, o, Set.insert f s)
  let (nl, os, s) = foldr df ([], thDirList th, thDirList th) dl
  mapM_ (subTail th True) nl
  return (th{ thDirList = Set.difference s os, thAgain = False },
    guard (tailDirList (thTail th)) >> map insertMsg nl ++ map deleteMsg (Set.toList os))
readTail th@TailHandle{ thFd = Just fd, thPos = pos } =
  if pos == -1
    then catchWhen isEOFError readsock $ do
      noRead <$> checkbuf th
      -- thErrMsg th "closed?"
      -- closeTail th >.= noRead th
    else gotlen . fileSize =<< getFdStatus fd
  where
    checkbuf th'@TailHandle{ thBuf = buf } = do
      unless (BS.null buf) $
	thErrMsg th' ("Unterminated line: " <> buf)
      return th'{ thBuf = BS.empty }
    gotlen len
      | len < pos = do
          thErrMsg th ("File truncated to " <> BS.pack (show len))
	  seekTail 0 th >>= readTail
      | len == pos = do
	  noRead <$> checkbuf th
      | otherwise = do
	  let count = min (fromIntegral (len - pos)) bufsiz
	  buf <- readit count
          let buflen = BS.length buf
	  when (buflen /= count) $
	    thErrMsg th ("Short read (" <> BS.pack (show buflen) <> BS.singleton '/' <> BS.pack (show count) <> BS.singleton ')')
	  return $ gotbuf th{ thPos = pos + fromIntegral buflen, thAgain = buflen == bufsiz } buf
    readsock =
      gotbuf th{ thAgain = True } <$> readit bufsiz
    gotbuf th'@TailHandle{ thBuf = oldbuf } buf
      | BS.null buf = noRead th'
      | otherwise = case initLast $ BS.split '\n' buf of
	([], r) ->
	  (th'{ thBuf = oldbuf <> r }, [])
	(l1 : l, r) ->
	  (th'{ thBuf = r }, (oldbuf <> l1) : l)
    readit len = catchWhen isFullError
      (allocaBytes len $ \p -> do
        l <- fdReadBuf fd p (fromIntegral len)
        BS.packCStringLen (castPtr p, fromIntegral l))
      (return BS.empty)

pause :: IO ()
pause = readMVar =<< newEmptyMVar

waitTail :: TailHandle -> IO ()
waitTail TailHandle{ thFd = Nothing } = pause
waitTail TailHandle{ thFd = Just fd, thPos = -1 } = threadWaitRead fd
waitTail TailHandle{ thAgain = True } = yield
waitTail TailHandle{ thTail = Tail{ tailPollInterval = i } }
  | i == 0 = pause
  | otherwise = threadDelayInterval i

reopenThread :: Interval -> ThreadId -> IO ()
reopenThread ri tid = forever $ do
  threadDelayInterval ri
  throwTo tid SignalReopen

newTail :: TailRuntime -> Tail -> IO TailHandle
newTail tr t@Tail{ tailReopenInterval = ri } = do
  tid <- myThreadId
  rid <- justWhen (ri /= 0) $ forkIOUnmasked $ reopenThread ri tid
  return TailHandle
    { thTail = t
    , thRuntime = tr
    , thPoll = tid
    , thReopen = rid
    , thFd = Nothing
    , thPos = if tailBegin t then 0 else -1
    , thIno = Nothing
    , thBuf = BS.empty
    , thDirStream = Nothing
    , thDirList = Set.empty
    , thAgain = True
#ifdef INOTIFY
    , thPollWatch = Nothing
    , thReopenWatch = Nothing
#endif
    }

runTail :: TailRuntime -> Tail -> IO ()
runTail tr tl = mask $ \unmask ->
  let
    signal th SignalReopen = reopenTail th -- >>= wait
    signal th SignalPoll = return th{ thAgain = True }
    signal th@TailHandle{ thDirStream = ~(Just _), thDirList = l, thTail = t } (SignalInsert f new) = do
      subTail th new f
      if tailDirList t
        then proc th' [insertMsg f]
        else return th'
      where th' = th{ thDirList = Set.insert f l }
    signal th@TailHandle{ thDirStream = ~(Just _), thDirList = l, thTail = t } (SignalDelete f) = 
      if tailDirList t 
        then proc th' [deleteMsg f]
        else return th'
      where th' = th{ thDirList = Set.delete f l }
    wait th = catch
      (unmask $ waitTail th $> th)
      (signal th >=> wait)
    poll th
      | thReopen th == Nothing
	&& (thFd th == Nothing
	  || (thAgain th == False && thPos th /= -1
	    && tailPollInterval (thTail th) == 0
#ifdef INOTIFY
	    && thPollWatch th == Nothing 
	    && thReopenWatch th == Nothing
#endif
	    ))
	= return th
      | otherwise = wait th >>= go
    go th = readTail th >>= uncurry proc >>= poll
    proc th s = mapM_ fun s $> th
    fun = tailText tr tl
  in
  newTail tr tl >>=
  openTail >>= go >>= 
  mapM_ killThread . thReopen