XRootD
XrdPfcPurge.cc
Go to the documentation of this file.
1 #include "XrdPfc.hh"
2 #include "XrdPfcTrace.hh"
3 
4 #include <fcntl.h>
5 #include <sys/time.h>
6 
7 #include "XrdOuc/XrdOucEnv.hh"
8 #include "XrdOss/XrdOssAt.hh"
9 #include "XrdSys/XrdSysTrace.hh"
10 
11 using namespace XrdPfc;
12 
13 namespace XrdPfc
14 {
15 
17 {
18  // needed for logging macros
19  return Cache::GetInstance().GetTrace();
20 }
21 
22 // Temporary, extensive purge tracing
23 // #define TRACE_PURGE(x) TRACE(Debug, x)
24 // #define TRACE_PURGE(x) std::cout << "PURGE " << x << "\n"
25 #define TRACE_PURGE(x)
26 
27 //==============================================================================
28 // DirState
29 //==============================================================================
30 
31 class DirState
32 {
33  DirState *m_parent;
34 
35  Stats m_stats; // access stats from client reads in this directory (and subdirs)
36 
37  long long m_usage; // collected / measured during purge traversal
38  long long m_usage_extra; // collected from write events in this directory and subdirs
39  long long m_usage_purged; // amount of data purged from this directory (and subdirectories for leaf nodes)
40 
41  // begin purge traversal usage \_ so we can have a good estimate of what came in during the traversal
42  // end purge traversal usage / (should be small, presumably)
43 
44  // quota info, enabled?
45 
46  int m_depth;
47  int m_max_depth; // XXXX Do we need this? Should it be passed in to find functions?
48  bool m_stat_report; // not used yet - storing of stats requested
49 
50  typedef std::map<std::string, DirState> DsMap_t;
51  typedef DsMap_t::iterator DsMap_i;
52 
53  DsMap_t m_subdirs;
54 
55  void init()
56  {
57  m_usage = 0;
58  m_usage_extra = 0;
59  m_usage_purged = 0;
60  }
61 
62  DirState* create_child(const std::string &dir)
63  {
64  std::pair<DsMap_i, bool> ir = m_subdirs.insert(std::make_pair(dir, DirState(this)));
65  return & ir.first->second;
66  }
67 
68  DirState* find_path_tok(PathTokenizer &pt, int pos, bool create_subdirs)
69  {
70  if (pos == pt.get_n_dirs()) return this;
71 
72  DsMap_i i = m_subdirs.find(pt.m_dirs[pos]);
73 
74  DirState *ds = 0;
75 
76  if (i != m_subdirs.end())
77  {
78  ds = & i->second;
79  }
80  if (create_subdirs && m_depth < m_max_depth)
81  {
82  ds = create_child(pt.m_dirs[pos]);
83  }
84  if (ds) return ds->find_path_tok(pt, pos + 1, create_subdirs);
85 
86  return 0;
87  }
88 
89 public:
90 
91  DirState(int max_depth) : m_parent(0), m_depth(0), m_max_depth(max_depth)
92  {
93  init();
94  }
95 
96  DirState(DirState *parent) : m_parent(parent), m_depth(m_parent->m_depth + 1), m_max_depth(m_parent->m_max_depth)
97  {
98  init();
99  }
100 
101  DirState* get_parent() { return m_parent; }
102 
103  void set_usage(long long u) { m_usage = u; m_usage_extra = 0; }
104  void add_up_stats(const Stats& stats) { m_stats.AddUp(stats); }
105  void add_usage_purged(long long up) { m_usage_purged += up; }
106 
107  DirState* find_path(const std::string &path, int max_depth, bool parse_as_lfn, bool create_subdirs)
108  {
109  PathTokenizer pt(path, max_depth, parse_as_lfn);
110 
111  return find_path_tok(pt, 0, create_subdirs);
112  }
113 
114  DirState* find_dir(const std::string &dir, bool create_subdirs)
115  {
116  DsMap_i i = m_subdirs.find(dir);
117 
118  if (i != m_subdirs.end()) return & i->second;
119 
120  if (create_subdirs && m_depth < m_max_depth) return create_child(dir);
121 
122  return 0;
123  }
124 
125  void reset_stats()
126  {
127  m_stats.Reset();
128 
129  for (DsMap_i i = m_subdirs.begin(); i != m_subdirs.end(); ++i)
130  {
131  i->second.reset_stats();
132  }
133  }
134 
136  {
137  for (DsMap_i i = m_subdirs.begin(); i != m_subdirs.end(); ++i)
138  {
139  i->second.upward_propagate_stats();
140 
141  m_stats.AddUp(i->second.m_stats);
142  }
143 
144  m_usage_extra += m_stats.m_BytesWritten;
145  }
146 
148  {
149  for (DsMap_i i = m_subdirs.begin(); i != m_subdirs.end(); ++i)
150  {
151  m_usage_purged += i->second.upward_propagate_usage_purged();
152  }
153  m_usage -= m_usage_purged;
154 
155  long long ret = m_usage_purged;
156  m_usage_purged = 0;
157  return ret;
158  }
159 
160  void dump_recursively(const char *name)
161  {
162  printf("%*d %s usage=%lld usage_extra=%lld usage_total=%lld num_ios=%d duration=%d b_hit=%lld b_miss=%lld b_byps=%lld b_wrtn=%lld\n",
163  2 + 2*m_depth, m_depth, name, m_usage, m_usage_extra, m_usage + m_usage_extra,
164  m_stats.m_NumIos, m_stats.m_Duration, m_stats.m_BytesHit, m_stats.m_BytesMissed, m_stats.m_BytesBypassed, m_stats.m_BytesWritten);
165 
166  for (DsMap_i i = m_subdirs.begin(); i != m_subdirs.end(); ++i)
167  {
168  i->second.dump_recursively(i->first.c_str());
169  }
170  }
171 };
172 
173 
174 //==============================================================================
175 // DataFsState
176 //==============================================================================
177 
179 {
180  int m_max_depth;
181  DirState m_root;
182  time_t m_prev_time;
183 
184 public:
186  m_max_depth ( Cache::Conf().m_dirStatsStoreDepth ),
187  m_root ( m_max_depth ),
188  m_prev_time ( time(0) )
189  {}
190 
191  int get_max_depth() const { return m_max_depth; }
192 
193  DirState* get_root() { return & m_root; }
194 
195  DirState* find_dirstate_for_lfn(const std::string& lfn)
196  {
197  return m_root.find_path(lfn, m_max_depth, true, true);
198  }
199 
200  void reset_stats() { m_root.reset_stats(); }
203 
205  {
206  time_t now = time(0);
207 
208  printf("DataFsState::dump_recursively epoch = %lld delta_t = %lld max_depth = %d\n",
209  (long long) now, (long long) (now - m_prev_time), m_max_depth);
210 
211  m_prev_time = now;
212 
213  m_root.dump_recursively("root");
214  }
215 };
216 
217 
218 //==============================================================================
219 // FPurgeState
220 //==============================================================================
221 
223 {
224 public:
225  struct FS
226  {
227  std::string path;
228  long long nBytes;
229  time_t time;
231 
232  FS(const std::string &dname, const char *fname, long long n, time_t t, DirState *ds) :
233  path(dname + fname), nBytes(n), time(t), dirState(ds)
234  {}
235  };
236 
237  typedef std::multimap<time_t, FS> map_t;
238  typedef map_t::iterator map_i;
239 
240  map_t m_fmap; // map of files that are purge candidates
241 
242  typedef std::list<FS> list_t;
243  typedef list_t::iterator list_i;
244 
245  list_t m_flist; // list of files to be removed unconditionally
246 
247  long long nBytesReq;
248  long long nBytesAccum;
249  long long nBytesTotal;
252 
253  // XrdOss *m_oss;
255 
256  // ------------------------------------
257  // Directory handling & stat collection
258  // ------------------------------------
259 
261  std::string m_current_path; // Includes trailing '/'
263  const int m_max_dir_level_for_stat_collection; // until we honor globs from pfc.dirstats
264 
265  std::vector<std::string> m_dir_names_stack;
266  std::vector<long long> m_dir_usage_stack;
267 
268  const char *m_info_ext;
269  const size_t m_info_ext_len;
271 
272  static const char *m_traceID;
273 
274 
275  void begin_traversal(DirState *root, const char *root_path = "/")
276  {
277  m_dir_state = root;
278  m_dir_level = 0;
279  m_current_path = std::string(root_path);
280  m_dir_usage_stack.push_back(0);
281 
282  TRACE_PURGE("FPurgeState::begin_traversal cur_path '" << m_current_path << "', usage=" << m_dir_usage_stack.back() << ", level=" << m_dir_level);
283  }
284 
286  {
287  TRACE_PURGE("FPurgeState::end_traversal reporting for '" << m_current_path << "', usage=" << m_dir_usage_stack.back() << ", nBytesTotal=" << nBytesTotal << ", level=" << m_dir_level);
288 
290 
291  m_dir_state = 0;
292  }
293 
294  void cd_down(const std::string& dir_name)
295  {
296  ++m_dir_level;
297 
299  {
300  m_dir_usage_stack.push_back(0);
301  m_dir_state = m_dir_state->find_dir(dir_name, true);
302  }
303 
304  m_dir_names_stack.push_back(dir_name);
305  m_current_path.append(dir_name);
306  m_current_path.append("/");
307  }
308 
309  void cd_up()
310  {
312  {
313  long long tail = m_dir_usage_stack.back();
314  m_dir_usage_stack.pop_back();
315 
316  TRACE_PURGE("FPurgeState::cd_up reporting for '" << m_current_path << "', usage=" << tail << ", level=" << m_dir_level);
317 
318  m_dir_state->set_usage(tail);
320 
321  m_dir_usage_stack.back() += tail;
322  }
323 
324  // remove trailing / and last dir but keep the new trailing / in place.
325  m_current_path.erase(m_current_path.find_last_of('/', m_current_path.size() - 2) + 1);
326  m_dir_names_stack.pop_back();
327 
328  --m_dir_level;
329  }
330 
331  // ------------------------------------------------------------------------
332  // ------------------------------------------------------------------------
333 
334  FPurgeState(long long iNBytesReq, XrdOss &oss) :
336  // m_oss(oss),
337  m_oss_at(oss),
338  m_dir_state(0), m_dir_level(0),
339  m_max_dir_level_for_stat_collection(Cache::Conf().m_dirStatsStoreDepth),
340  m_info_ext(XrdPfc::Info::s_infoExtension),
341  m_info_ext_len(strlen(XrdPfc::Info::s_infoExtension)),
342  m_trace(Cache::GetInstance().GetTrace())
343  {
344  m_current_path.reserve(256);
345  m_dir_names_stack.reserve(32);
347  }
348 
349  // ------------------------------------------------------------------------
350 
351  void setMinTime(time_t min_time) { tMinTimeStamp = min_time; }
352  time_t getMinTime() const { return tMinTimeStamp; }
353  void setUVKeepMinTime(time_t min_time) { tMinUVKeepTimeStamp = min_time; }
354  long long getNBytesTotal() const { return nBytesTotal; }
355 
357  {
358  for (list_i i = m_flist.begin(); i != m_flist.end(); ++i)
359  {
360  m_fmap.insert(std::make_pair(i->time, *i));
361  }
362  m_flist.clear();
363  }
364 
365  /*
366  void UnlinkInfoAndData(const char *fname, long long nbytes, XrdOssDF *iOssDF)
367  {
368  fname[fname_len - m_info_ext_len] = 0;
369  if (nbytes > 0)
370  {
371  if ( ! Cache.GetInstance().IsFileActiveOrPurgeProtected(dataPath))
372  {
373  m_n_purged++;
374  m_bytes_purged += nbytes;
375  } else
376  {
377  m_n_purge_protected++;
378  m_bytes_purge_protected += nbytes;
379  m_dir_state->add_usage_purged(nbytes);
380  // XXXX should also tweak other stuff?
381  fname[fname_len - m_info_ext_len] = '.';
382  return;
383  }
384  }
385  m_oss_at.Unlink(*iOssDF, fname);
386  fname[fname_len - m_info_ext_len] = '.';
387  m_oss_at.Unlink(*iOssDF, fname);
388  }
389  */
390 
391  void CheckFile(const char *fname, Info &info, struct stat &fstat /*, XrdOssDF *iOssDF*/)
392  {
393  static const char *trc_pfx = "FPurgeState::CheckFile ";
394 
395  long long nbytes = info.GetNDownloadedBytes();
396  time_t atime;
397  if ( ! info.GetLatestDetachTime(atime))
398  {
399  // cinfo file does not contain any known accesses, use fstat.mtime instead.
400  TRACE(Debug, trc_pfx << "could not get access time for " << m_current_path << fname << ", using mtime from stat instead.");
401  atime = fstat.st_mtime;
402  }
403  // TRACE(Dump, trc_pfx << "checking " << fname << " accessTime " << atime);
404 
405  nBytesTotal += nbytes;
406 
407  m_dir_usage_stack.back() += nbytes;
408 
409  // XXXX Should remove aged-out files here ... but I have trouble getting
410  // the DirState and purge report set up consistently.
411  // Need some serious code reorganization here.
412  // Biggest problem is maintaining overall state a traversal state consistently.
413  // Sigh.
414 
415  // In first two cases we lie about FS time (set to 0) to get them all removed early.
416  // The age-based purge atime would also be good as there should be nothing
417  // before that time in the map anyway.
418  // But we use 0 as a test in purge loop to make sure we continue even if enough
419  // disk-space has been freed.
420 
421  if (tMinTimeStamp > 0 && atime < tMinTimeStamp)
422  {
423  m_flist.push_back(FS(m_current_path, fname, nbytes, 0, m_dir_state));
424  nBytesAccum += nbytes;
425  }
426  else if (tMinUVKeepTimeStamp > 0 &&
427  Cache::Conf().does_cschk_have_missing_bits(info.GetCkSumState()) &&
429  {
430  m_flist.push_back(FS(m_current_path, fname, nbytes, 0, m_dir_state));
431  nBytesAccum += nbytes;
432  }
433  else if (nBytesAccum < nBytesReq || ( ! m_fmap.empty() && atime < m_fmap.rbegin()->first))
434  {
435  m_fmap.insert(std::make_pair(atime, FS(m_current_path, fname, nbytes, atime, m_dir_state)));
436  nBytesAccum += nbytes;
437 
438  // remove newest files from map if necessary
439  while ( ! m_fmap.empty() && nBytesAccum - m_fmap.rbegin()->second.nBytes >= nBytesReq)
440  {
441  nBytesAccum -= m_fmap.rbegin()->second.nBytes;
442  m_fmap.erase(--(m_fmap.rbegin().base()));
443  }
444  }
445  }
446 
448  {
449  static const char *trc_pfx = "FPurgeState::TraverseNamespace ";
450 
451  char fname[256];
452  struct stat fstat;
453  XrdOucEnv env;
454 
455  TRACE_PURGE("Starting to read dir [" << m_current_path << "], iOssDF->getFD()=" << iOssDF->getFD() << ".");
456 
457  iOssDF->StatRet(&fstat);
458 
459  while (true)
460  {
461  int rc = iOssDF->Readdir(fname, 256);
462 
463  if (rc == -ENOENT) {
464  TRACE_PURGE(" Skipping ENOENT dir entry [" << fname << "].");
465  continue;
466  }
467  if (rc != XrdOssOK) {
468  TRACE(Error, trc_pfx << "Readdir error at " << m_current_path << ", err " << XrdSysE2T(-rc) << ".");
469  break;
470  }
471 
472  TRACE_PURGE(" Readdir [" << fname << "]");
473 
474  if (fname[0] == 0) {
475  TRACE_PURGE(" Finished reading dir [" << m_current_path << "]. Break loop.");
476  break;
477  }
478  if (fname[0] == '.' && (fname[1] == 0 || (fname[1] == '.' && fname[2] == 0))) {
479  TRACE_PURGE(" Skipping here or parent dir [" << fname << "]. Continue loop.");
480  continue;
481  }
482 
483  size_t fname_len = strlen(fname);
484  XrdOssDF *dfh = 0;
485 
486  if (S_ISDIR(fstat.st_mode))
487  {
488  if (m_oss_at.Opendir(*iOssDF, fname, env, dfh) == XrdOssOK)
489  {
490  cd_down(fname); TRACE_PURGE(" cd_down -> [" << m_current_path << "].");
491  TraverseNamespace(dfh);
492  cd_up(); TRACE_PURGE(" cd_up -> [" << m_current_path << "].");
493  }
494  else
495  TRACE(Warning, trc_pfx << "could not opendir [" << m_current_path << fname << "], " << XrdSysE2T(errno));
496  }
497  else if (fname_len > m_info_ext_len && strncmp(&fname[fname_len - m_info_ext_len], m_info_ext, m_info_ext_len) == 0)
498  {
499  // Check if the file is currently opened / purge-protected is done before unlinking of the file.
500 
501  Info cinfo(m_trace);
502 
503  if (m_oss_at.OpenRO(*iOssDF, fname, env, dfh) == XrdOssOK && cinfo.Read(dfh, m_current_path.c_str(), fname))
504  {
505  CheckFile(fname, cinfo, fstat);
506  }
507  else
508  {
509  TRACE(Warning, trc_pfx << "can't open or read " << m_current_path << fname << ", err " << XrdSysE2T(errno) << "; purging.");
510  m_oss_at.Unlink(*iOssDF, fname);
511  fname[fname_len - m_info_ext_len] = 0;
512  m_oss_at.Unlink(*iOssDF, fname);
513  }
514  }
515  else // XXXX devel debug only, to be removed
516  {
517  TRACE_PURGE(" Ignoring [" << fname << "], not a dir or cinfo.");
518  }
519 
520  delete dfh;
521  }
522  }
523 };
524 
525 const char *FPurgeState::m_traceID = "Purge";
526 
527 
528 //==============================================================================
529 // ResourceMonitor
530 //==============================================================================
531 
532 // Encapsulates local variables used withing the previous mega-function Purge().
533 //
534 // This will be used within the continuously/periodically ran heart-beat / breath
535 // function ... and then parts of it will be passed to invoked FS scan and purge
536 // jobs (which will be controlled throught this as well).
537 
539 {
540 
541 };
542 
543 
544 //==============================================================================
545 //
546 //==============================================================================
547 
548 namespace
549 {
550 
551 class ScanAndPurgeJob : public XrdJob
552 {
553 public:
554  ScanAndPurgeJob(const char *desc = "") : XrdJob(desc) {}
555 
556  void DoIt() {} // { Cache::GetInstance().ScanAndPurge(); }
557 };
558 
559 }
560 
561 //==============================================================================
562 // Cache methods
563 //==============================================================================
564 
565 void Cache::copy_out_active_stats_and_update_data_fs_state()
566 {
567  static const char *trc_pfx = "copy_out_active_stats_and_update_data_fs_state() ";
568 
569  StatsMMap_t updates;
570  {
571  XrdSysCondVarHelper lock(&m_active_cond);
572 
573  // Slurp in stats from files closed since last cycle.
574  updates.swap( m_closed_files_stats );
575 
576  for (ActiveMap_i i = m_active.begin(); i != m_active.end(); ++i)
577  {
578  if (i->second != 0)
579  {
580  updates.insert(std::make_pair(i->first, i->second->DeltaStatsFromLastCall()));
581  }
582  }
583  }
584 
585  m_fs_state->reset_stats(); // XXXX-CKSUM rethink how to do this if we keep some purge entries for next time
586 
587  for (StatsMMap_i i = updates.begin(); i != updates.end(); ++i)
588  {
589  DirState *ds = m_fs_state->find_dirstate_for_lfn(i->first);
590 
591  if (ds == 0)
592  {
593  TRACE(Error, trc_pfx << "Failed finding DirState for file '" << i->first << "'.");
594  continue;
595  }
596 
597  ds->add_up_stats(i->second);
598  }
599 
600  m_fs_state->upward_propagate_stats();
601 }
602 
603 
604 //==============================================================================
605 
607 {
608  // static const char *trc_pfx = "ResourceMonitorHeartBeat() ";
609 
610  // Pause before initial run
611  sleep(1);
612 
613  // XXXX Setup initial / constant stats (total RAM, total disk, ???)
614 
617 
618  S.Lock();
619 
620  X.DiskSize = m_configuration.m_diskTotalSpace;
621 
622  X.MemSize = m_configuration.m_RamAbsAvailable;
623 
624  S.UnLock();
625 
626  // XXXX Schedule initial disk scan, time it!
627  //
628  // TRACE(Info, trc_pfx << "scheduling intial disk scan.");
629  // schedP->Schedule( new ScanAndPurgeJob("XrdPfc::ScanAndPurge") );
630  //
631  // bool scan_and_purge_running = true;
632 
633  // XXXX Could we really hold last-usage for all files in memory?
634 
635  // XXXX Think how to handle disk-full, scan/purge not finishing:
636  // - start dropping things out of write queue, but only when RAM gets near full;
637  // - monitoring this then becomes a high-priority job, inner loop with sleep of,
638  // say, 5 or 10 seconds.
639 
640  while (true)
641  {
642  time_t heartbeat_start = time(0);
643 
644  // TRACE(Info, trc_pfx << "HeartBeat starting ...");
645 
646  // if sumary monitoring configured, pupulate OucCacheStats:
647  S.Lock();
648 
649  // - available / used disk space (files usage calculated elsewhere (maybe))
650 
651  // - RAM usage
652  { XrdSysMutexHelper lck(&m_RAM_mutex);
653  X.MemUsed = m_RAM_used;
654  X.MemWriteQ = m_RAM_write_queue;
655  }
656  // - files opened / closed etc
657 
658  // do estimate of available space
659  S.UnLock();
660 
661  // if needed, schedule purge in a different thread.
662  // purge is:
663  // - deep scan + gather FSPurgeState
664  // - actual purge
665  //
666  // this thread can continue running and, if needed, stop writing to disk
667  // if purge is taking too long.
668 
669  // think how data is passed / synchronized between this and purge thread
670 
671  // !!!! think how stat collection is done and propgated upwards;
672  // until now it was done once per purge-interval.
673  // now stats will be added up more often, but purge will be done
674  // only occasionally.
675  // also, do we report cumulative values or deltas? cumulative should
676  // be easier and consistent with summary data.
677  // still, some are state - like disk usage, num of files.
678 
679  // Do we take care of directories that need to be newly added into DirState hierarchy?
680  // I.e., when user creates new directories and these are covered by either full
681  // spec or by root + depth declaration.
682 
683  int heartbeat_duration = time(0) - heartbeat_start;
684 
685  // TRACE(Info, trc_pfx << "HeartBeat finished, heartbeat_duration " << heartbeat_duration);
686 
687  // int sleep_time = m_configuration.m_purgeInterval - heartbeat_duration;
688  int sleep_time = 60 - heartbeat_duration;
689  if (sleep_time > 0)
690  {
691  sleep(sleep_time);
692  }
693  }
694 }
695 
696 //==============================================================================
697 
699 {
700  static const char *trc_pfx = "Purge() ";
701 
702  XrdOucEnv env;
703  long long disk_usage;
704  long long estimated_file_usage = m_configuration.m_diskUsageHWM;
705 
706  // Pause before initial run
707  sleep(1);
708 
709  m_fs_state = new DataFsState;
710 
711  // { PathTokenizer p("/a/b/c/f.root", 2, true); p.deboog(); }
712  // { PathTokenizer p("/a/b/f.root", 2, true); p.deboog(); }
713  // { PathTokenizer p("/a/f.root", 2, true); p.deboog(); }
714  // { PathTokenizer p("/f.root", 2, true); p.deboog(); }
715 
716  int age_based_purge_countdown = 0; // enforce on first purge loop entry.
717  bool is_first = true;
718 
719  while (true)
720  {
721  time_t purge_start = time(0);
722 
723  {
724  XrdSysCondVarHelper lock(&m_active_cond);
725 
726  m_in_purge = true;
727  }
728 
729  TRACE(Info, trc_pfx << "Started.");
730 
731  // Bytes to remove based on total disk usage (d) and file usage (f).
732  long long bytesToRemove_d = 0, bytesToRemove_f = 0;
733 
734  // get amount of space to potentially erase based on total disk usage
735  XrdOssVSInfo sP; // Make sure we start when a clean slate in each loop
736  if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
737  {
738  TRACE(Error, trc_pfx << "can't get StatVS for oss space " << m_configuration.m_data_space);
739  continue;
740  }
741  else
742  {
743  disk_usage = sP.Total - sP.Free;
744  TRACE(Debug, trc_pfx << "used disk space " << disk_usage << " bytes.");
745 
746  if (disk_usage > m_configuration.m_diskUsageHWM)
747  {
748  bytesToRemove_d = disk_usage - m_configuration.m_diskUsageLWM;
749  }
750  }
751 
752  // estimate amount of space to erase based on file usage
753  if (m_configuration.are_file_usage_limits_set())
754  {
755  long long estimated_writes_since_last_purge;
756  {
757  XrdSysCondVarHelper lock(&m_writeQ.condVar);
758 
759  estimated_writes_since_last_purge = m_writeQ.writes_between_purges;
760  m_writeQ.writes_between_purges = 0;
761  }
762  estimated_file_usage += estimated_writes_since_last_purge;
763 
764  TRACE(Debug, trc_pfx << "estimated usage by files " << estimated_file_usage << " bytes.");
765 
766  bytesToRemove_f = std::max(estimated_file_usage - m_configuration.m_fileUsageNominal, 0ll);
767 
768  // Here we estimate fractional usages -- to decide if full scan is necessary before actual purge.
769  double frac_du = 0, frac_fu = 0;
770  m_configuration.calculate_fractional_usages(disk_usage, estimated_file_usage, frac_du, frac_fu);
771 
772  if (frac_fu > 1.0 - frac_du)
773  {
774  bytesToRemove_f = std::max(bytesToRemove_f, disk_usage - m_configuration.m_diskUsageLWM);
775  }
776  }
777 
778  long long bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
779 
780  bool enforce_age_based_purge = false;
781  if (m_configuration.is_age_based_purge_in_effect() || m_configuration.is_uvkeep_purge_in_effect())
782  {
783  // XXXX ... I could collect those guys in larger vectors (maps?) and do traversal when
784  // they are empty.
785  if (--age_based_purge_countdown <= 0)
786  {
787  enforce_age_based_purge = true;
788  age_based_purge_countdown = m_configuration.m_purgeAgeBasedPeriod;
789  }
790  }
791 
792  bool enforce_traversal_for_usage_collection = is_first;
793  // XXX Other conditions? Periodic checks?
794 
795  copy_out_active_stats_and_update_data_fs_state();
796 
797  TRACE(Debug, trc_pfx << "Precheck:");
798  TRACE(Debug, "\tbytes_to_remove_disk = " << bytesToRemove_d << " B");
799  TRACE(Debug, "\tbytes_to remove_files = " << bytesToRemove_f << " B (" << (is_first ? "max possible for initial run" : "estimated") << ")");
800  TRACE(Debug, "\tbytes_to_remove = " << bytesToRemove << " B");
801  TRACE(Debug, "\tenforce_age_based_purge = " << enforce_age_based_purge);
802  is_first = false;
803 
804  long long bytesToRemove_at_start = 0; // set after file scan
805  int deleted_file_count = 0;
806 
807  bool purge_required = (bytesToRemove > 0 || enforce_age_based_purge);
808 
809  // XXXX-PurgeOpt Need to retain this state between purges so I can avoid doing
810  // the traversal more often than really needed.
811  FPurgeState purgeState(2 * bytesToRemove, *m_oss); // prepare twice more volume than required
812 
813  if (purge_required || enforce_traversal_for_usage_collection)
814  {
815  // Make a sorted map of file paths sorted by access time.
816 
817  if (m_configuration.is_age_based_purge_in_effect())
818  {
819  purgeState.setMinTime(time(0) - m_configuration.m_purgeColdFilesAge);
820  }
821  if (m_configuration.is_uvkeep_purge_in_effect())
822  {
823  purgeState.setUVKeepMinTime(time(0) - m_configuration.m_cs_UVKeep);
824  }
825 
826  XrdOssDF* dh = m_oss->newDir(m_configuration.m_username.c_str());
827  if (dh->Opendir("/", env) == XrdOssOK)
828  {
829  purgeState.begin_traversal(m_fs_state->get_root());
830 
831  purgeState.TraverseNamespace(dh);
832 
833  purgeState.end_traversal();
834 
835  dh->Close();
836  }
837  delete dh; dh = 0;
838 
839  estimated_file_usage = purgeState.getNBytesTotal();
840 
841  TRACE(Debug, trc_pfx << "actual usage by files " << estimated_file_usage << " bytes.");
842 
843  // Adjust bytesToRemove_f and then bytesToRemove based on actual file usage,
844  // possibly retreating below nominal file usage (but not below baseline file usage).
845  if (m_configuration.are_file_usage_limits_set())
846  {
847  bytesToRemove_f = std::max(estimated_file_usage - m_configuration.m_fileUsageNominal, 0ll);
848 
849  double frac_du = 0, frac_fu = 0;
850  m_configuration.calculate_fractional_usages(disk_usage, estimated_file_usage, frac_du, frac_fu);
851 
852  if (frac_fu > 1.0 - frac_du)
853  {
854  bytesToRemove = std::max(bytesToRemove_f, disk_usage - m_configuration.m_diskUsageLWM);
855  bytesToRemove = std::min(bytesToRemove, estimated_file_usage - m_configuration.m_fileUsageBaseline);
856  }
857  else
858  {
859  bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
860  }
861  }
862  else
863  {
864  bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
865  }
866  bytesToRemove_at_start = bytesToRemove;
867 
868  TRACE(Debug, trc_pfx << "After scan:");
869  TRACE(Debug, "\tbytes_to_remove_disk = " << bytesToRemove_d << " B");
870  TRACE(Debug, "\tbytes_to remove_files = " << bytesToRemove_f << " B (measured)");
871  TRACE(Debug, "\tbytes_to_remove = " << bytesToRemove << " B");
872  TRACE(Debug, "\tenforce_age_based_purge = " << enforce_age_based_purge);
873  TRACE(Debug, "\tmin_time = " << purgeState.getMinTime());
874 
875  if (enforce_age_based_purge)
876  {
877  purgeState.MoveListEntriesToMap();
878  }
879  }
880 
881  // Dump statistcs before actual purging so maximum usage values get recorded.
882  // Should really go to gstream --- and should really go from Heartbeat.
883  if (m_configuration.is_dir_stat_reporting_on())
884  {
885  m_fs_state->dump_recursively();
886  }
887 
888  if (purge_required)
889  {
890  // Loop over map and remove files with oldest values of access time.
891  struct stat fstat;
892  size_t info_ext_len = strlen(Info::s_infoExtension);
893  int protected_cnt = 0;
894  long long protected_sum = 0;
895  for (FPurgeState::map_i it = purgeState.m_fmap.begin(); it != purgeState.m_fmap.end(); ++it)
896  {
897  // Finish when enough space has been freed but not while age-based purging is in progress.
898  // Those files are marked with time-stamp = 0.
899  if (bytesToRemove <= 0 && ! (enforce_age_based_purge && it->first == 0))
900  {
901  break;
902  }
903 
904  std::string &infoPath = it->second.path;
905  std::string dataPath = infoPath.substr(0, infoPath.size() - info_ext_len);
906 
907  if (IsFileActiveOrPurgeProtected(dataPath))
908  {
909  ++protected_cnt;
910  protected_sum += it->second.nBytes;
911  TRACE(Debug, trc_pfx << "File is active or purge-protected: " << dataPath << " size: " << it->second.nBytes);
912  continue;
913  }
914 
915  // remove info file
916  if (m_oss->Stat(infoPath.c_str(), &fstat) == XrdOssOK)
917  {
918  // cinfo file can be on another oss.space, do not subtract for now.
919  // Could be relevant for very small block sizes.
920  // bytesToRemove -= fstat.st_size;
921  // estimated_file_usage -= fstat.st_size;
922  // ++deleted_file_count;
923 
924  m_oss->Unlink(infoPath.c_str());
925  TRACE(Dump, trc_pfx << "Removed file: '" << infoPath << "' size: " << fstat.st_size);
926  }
927 
928  // remove data file
929  if (m_oss->Stat(dataPath.c_str(), &fstat) == XrdOssOK)
930  {
931  bytesToRemove -= it->second.nBytes;
932  estimated_file_usage -= it->second.nBytes;
933  ++deleted_file_count;
934 
935  m_oss->Unlink(dataPath.c_str());
936  TRACE(Dump, trc_pfx << "Removed file: '" << dataPath << "' size: " << it->second.nBytes << ", time: " << it->first);
937 
938  if (it->second.dirState != 0) // XXXX This should now always be true.
939  it->second.dirState->add_usage_purged(it->second.nBytes);
940  else
941  TRACE(Error, trc_pfx << "DirState not set for file '" << dataPath << "'.");
942  }
943  }
944  if (protected_cnt > 0)
945  {
946  TRACE(Info, trc_pfx << "Encountered " << protected_cnt << " protected files, sum of their size: " << protected_sum);
947  }
948 
949  m_fs_state->upward_propagate_usage_purged();
950  }
951 
952  {
953  XrdSysCondVarHelper lock(&m_active_cond);
954 
955  m_purge_delay_set.clear();
956  m_in_purge = false;
957  }
958 
959  int purge_duration = time(0) - purge_start;
960 
961  TRACE(Info, trc_pfx << "Finished, removed " << deleted_file_count << " data files, total size " <<
962  bytesToRemove_at_start - bytesToRemove << ", bytes to remove at end " << bytesToRemove << ", purge duration " << purge_duration);
963 
964  int sleep_time = m_configuration.m_purgeInterval - purge_duration;
965  if (sleep_time > 0)
966  {
967  sleep(sleep_time);
968  }
969  }
970 }
971 
972 } // end XrdPfc namespace
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
Definition: XrdAccTest.cc:262
static void parent()
#define XrdOssOK
Definition: XrdOss.hh:50
#define TRACE_PURGE(x)
Definition: XrdPfcPurge.cc:25
int stat(const char *path, struct stat *buf)
int fstat(int fildes, struct stat *buf)
bool Debug
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
@ Error
#define TRACE(act, x)
Definition: XrdTrace.hh:63
Definition: XrdJob.hh:43
int OpenRO(XrdOssDF &atDir, const char *path, XrdOucEnv &env, XrdOssDF *&ossDF)
Definition: XrdOssAt.cc:127
int Opendir(XrdOssDF &atDir, const char *path, XrdOucEnv &env, XrdOssDF *&ossDF)
Definition: XrdOssAt.cc:96
int Unlink(XrdOssDF &atDir, const char *path)
Definition: XrdOssAt.cc:199
virtual int StatRet(struct stat *buff)
Definition: XrdOss.hh:107
virtual int Opendir(const char *path, XrdOucEnv &env)
Definition: XrdOss.hh:79
virtual int Readdir(char *buff, int blen)
Definition: XrdOss.hh:92
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition: XrdOss.hh:426
long long Total
Definition: XrdOssVS.hh:90
long long Free
Definition: XrdOssVS.hh:91
virtual XrdOssDF * newDir(const char *tident)=0
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
struct XrdOucCacheStats::CacheStats X
XrdOucCacheStats Statistics
Definition: XrdOucCache.hh:686
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:267
static const Configuration & Conf()
Definition: XrdPfc.cc:165
XrdSysTrace * GetTrace()
Definition: XrdPfc.hh:402
void Purge()
Thread function invoked to scan and purge files from disk when needed.
Definition: XrdPfcPurge.cc:698
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:163
void ResourceMonitorHeartBeat()
Thread function checking resource usage periodically.
Definition: XrdPfcPurge.cc:606
bool IsFileActiveOrPurgeProtected(const std::string &)
Definition: XrdPfc.cc:684
DirState * get_root()
Definition: XrdPfcPurge.cc:193
int get_max_depth() const
Definition: XrdPfcPurge.cc:191
void upward_propagate_usage_purged()
Definition: XrdPfcPurge.cc:202
void upward_propagate_stats()
Definition: XrdPfcPurge.cc:201
DirState * find_dirstate_for_lfn(const std::string &lfn)
Definition: XrdPfcPurge.cc:195
DirState(int max_depth)
Definition: XrdPfcPurge.cc:91
DirState * get_parent()
Definition: XrdPfcPurge.cc:101
void add_usage_purged(long long up)
Definition: XrdPfcPurge.cc:105
void dump_recursively(const char *name)
Definition: XrdPfcPurge.cc:160
long long upward_propagate_usage_purged()
Definition: XrdPfcPurge.cc:147
DirState * find_path(const std::string &path, int max_depth, bool parse_as_lfn, bool create_subdirs)
Definition: XrdPfcPurge.cc:107
DirState * find_dir(const std::string &dir, bool create_subdirs)
Definition: XrdPfcPurge.cc:114
void upward_propagate_stats()
Definition: XrdPfcPurge.cc:135
DirState(DirState *parent)
Definition: XrdPfcPurge.cc:96
void set_usage(long long u)
Definition: XrdPfcPurge.cc:103
void add_up_stats(const Stats &stats)
Definition: XrdPfcPurge.cc:104
time_t getMinTime() const
Definition: XrdPfcPurge.cc:352
const int m_max_dir_level_for_stat_collection
Definition: XrdPfcPurge.cc:263
std::vector< long long > m_dir_usage_stack
Definition: XrdPfcPurge.cc:266
std::multimap< time_t, FS > map_t
Definition: XrdPfcPurge.cc:237
const char * m_info_ext
Definition: XrdPfcPurge.cc:268
void CheckFile(const char *fname, Info &info, struct stat &fstat)
Definition: XrdPfcPurge.cc:391
map_t::iterator map_i
Definition: XrdPfcPurge.cc:238
std::vector< std::string > m_dir_names_stack
Definition: XrdPfcPurge.cc:265
void setUVKeepMinTime(time_t min_time)
Definition: XrdPfcPurge.cc:353
list_t::iterator list_i
Definition: XrdPfcPurge.cc:243
XrdSysTrace * m_trace
Definition: XrdPfcPurge.cc:270
const size_t m_info_ext_len
Definition: XrdPfcPurge.cc:269
static const char * m_traceID
Definition: XrdPfcPurge.cc:272
std::list< FS > list_t
Definition: XrdPfcPurge.cc:242
void cd_down(const std::string &dir_name)
Definition: XrdPfcPurge.cc:294
void TraverseNamespace(XrdOssDF *iOssDF)
Definition: XrdPfcPurge.cc:447
void setMinTime(time_t min_time)
Definition: XrdPfcPurge.cc:351
FPurgeState(long long iNBytesReq, XrdOss &oss)
Definition: XrdPfcPurge.cc:334
long long getNBytesTotal() const
Definition: XrdPfcPurge.cc:354
void begin_traversal(DirState *root, const char *root_path="/")
Definition: XrdPfcPurge.cc:275
std::string m_current_path
Definition: XrdPfcPurge.cc:261
DirState * m_dir_state
Definition: XrdPfcPurge.cc:260
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:45
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:313
time_t GetNoCkSumTimeForUVKeep() const
Definition: XrdPfcInfo.hh:305
CkSumCheck_e GetCkSumState() const
Definition: XrdPfcInfo.hh:290
bool GetLatestDetachTime(time_t &t) const
Get latest detach time.
Definition: XrdPfcInfo.cc:472
long long GetNDownloadedBytes() const
Get number of downloaded bytes.
Definition: XrdPfcInfo.hh:411
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:296
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:31
long long m_BytesMissed
number of bytes served from remote and cached
Definition: XrdPfcStats.hh:36
long long m_BytesBypassed
number of bytes served directly through XrdCl
Definition: XrdPfcStats.hh:37
void AddUp(const Stats &s)
Definition: XrdPfcStats.hh:118
int m_Duration
total duration of all IOs attached
Definition: XrdPfcStats.hh:34
int m_NumIos
number of IO objects attached during this access
Definition: XrdPfcStats.hh:33
long long m_BytesHit
number of bytes served from disk
Definition: XrdPfcStats.hh:35
long long m_BytesWritten
number of bytes written to disk
Definition: XrdPfcStats.hh:38
Definition: XrdPfc.hh:41
XrdSysTrace * GetTrace()
Definition: XrdPfcPurge.cc:16
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:102
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition: XrdPfc.hh:85
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:88
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:87
bool is_uvkeep_purge_in_effect() const
Definition: XrdPfc.hh:61
bool are_file_usage_limits_set() const
Definition: XrdPfc.hh:59
long long m_fileUsageNominal
cache purge - files usage nominal
Definition: XrdPfc.hh:89
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition: XrdPfc.hh:93
int m_purgeColdFilesAge
purge files older than this age
Definition: XrdPfc.hh:92
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:82
void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu)
Definition: XrdPfc.cc:135
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:86
bool is_age_based_purge_in_effect() const
Definition: XrdPfc.hh:60
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:81
time_t m_cs_UVKeep
unverified checksum cache keep
Definition: XrdPfc.hh:111
int m_purgeInterval
sleep interval between cache purges
Definition: XrdPfc.hh:91
bool is_dir_stat_reporting_on() const
Definition: XrdPfc.hh:62
FS(const std::string &dname, const char *fname, long long n, time_t t, DirState *ds)
Definition: XrdPfcPurge.cc:232
std::vector< const char * > m_dirs
Definition: XrdPfc.hh:187