Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Go snippets from the new InfluxDB storage engine

Paul Dix
October 02, 2015

Go snippets from the new InfluxDB storage engine

Slides from my talk at GothamGo 2015

Paul Dix

October 02, 2015
Tweet

More Decks by Paul Dix

Other Decks in Technology

Transcript

  1. Go snippets from the new InfluxDB storage engine Paul Dix

    CEO at InfluxDB @pauldix paul@influxdb.com
  2. Arranging in Key/Value Stores 1,1443782126 Key Value 80 2,1443782126 18

    1,1443782127 81 2,1443782256 15 2,1443782130 17 3,1443700126 18
  3. Components WAL In memory cache Index Files Similar to LSM

    Trees Same like MemTables like SSTables
  4. In Memory Cache // cache and flush variables cacheLock sync.RWMutex

    cache map[string]Values flushCache map[string]Values temperature,device=dev1,building=b1#internal
  5. // cache and flush variables cacheLock sync.RWMutex cache map[string]Values flushCache

    map[string]Values dirtySort map[string]bool mutexes are your friend
  6. // cache and flush variables cacheLock sync.RWMutex cache map[string]Values flushCache

    map[string]Values dirtySort map[string]bool mutexes are your friend values can come in out of order. may need sorting later
  7. type Values []Value func (v Values) Encode(buf []byte) []byte {

    /* code here */ } // Sort methods func (a Values) Len() int { return len(a) } func (a Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a Values) Less(i, j int) bool { return a[i].Time().UnixNano() < a[j].Time().UnixNano() }
  8. type Values []Value func (v Values) Encode(buf []byte) []byte {

    /* code here */ } // Sort methods func (a Values) Len() int { return len(a) } func (a Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a Values) Less(i, j int) bool { return a[i].Time().UnixNano() < a[j].Time().UnixNano() } Sometimes I want generics… and then I come to my senses
  9. Finding a specific time // Seek will point the cursor

    to the given time (or key) func (c *walCursor) SeekTo(seek int64) (int64, interface{}) { // Seek cache index c.position = sort.Search(len(c.cache), func(i int) bool { return c.cache[i].Time().UnixNano() >= seek }) // more sweet code }
  10. awesome time series data WAL (an append only file) in

    memory index on disk index (periodic flushes)
  11. The Index Data File Min Time: 10000 Max Time: 29999

    Data File Min Time: 30000 Max Time: 39999 Data File Min Time: 70000 Max Time: 99999 Contiguous blocks of time
  12. The Index Data File Min Time: 10000 Max Time: 29999

    Data File Min Time: 30000 Max Time: 39999 Data File Min Time: 70000 Max Time: 99999 non-overlapping
  13. The Index Data File Min Time: 10000 Max Time: 29999

    Data File Min Time: 30000 Max Time: 39999 Data File Min Time: 70000 Max Time: 99999 Data File Min Time: 10000 Max Time: 99999 they periodically get compacted (like LSM)
  14. Compacting while appending new data func (w *WriteLock) LockRange(min, max

    int64) { // sweet code here } func (w *WriteLock) UnlockRange(min, max int64) { // sweet code here }
  15. Compacting while appending new data func (w *WriteLock) LockRange(min, max

    int64) { // sweet code here } func (w *WriteLock) UnlockRange(min, max int64) { // sweet code here } This should block until we get it
  16. func TestWriteLock_RightIntersect(t *testing.T) { w := &tsm1.WriteLock{} w.LockRange(2, 10) lock

    := make(chan bool) timeout := time.NewTimer(10 * time.Millisecond) go func() { w.LockRange(5, 15) lock <- true }() select { case <-lock: t.Fatal("able to get lock when we shouldn't") case <-timeout.C: // we're all good } }
  17. Back to the data files… Data File Min Time: 10000

    Max Time: 29999 Data File Min Time: 30000 Max Time: 39999 Data File Min Time: 70000 Max Time: 99999
  18. Memory Mapping fInfo, err := f.Stat() if err != nil

    { return nil, err } mmap, err := syscall.Mmap( int(f.Fd()), 0, int(fInfo.Size()), syscall.PROT_READ, syscall.MAP_SHARED) if err != nil { return nil, err }
  19. Access file like a byte slice func (d *dataFile) MinTime()

    int64 { minTimePosition := d.size - minTimeOffset timeBytes := d.mmap[minTimeOffset : minTimeOffset+timeSize] return int64(btou64(timeBytes)) }
  20. Finding the time for an ID func (d *dataFile) StartingPositionForID(id

    uint64) uint32 { seriesCount := d.SeriesCount() indexStart := d.indexPosition() min := uint32(0) max := uint32(seriesCount) for min < max { mid := (max-min)/2 + min offset := mid*seriesHeaderSize + indexStart checkID := btou64(d.mmap[offset : offset+timeSize]) if checkID == id { return btou32(d.mmap[offset+timeSize : offset+timeSize+posSize]) } else if checkID < id { min = mid + 1 } else { max = mid } } return uint32(0) } The Index: IDs are sorted
  21. test last night: 100,000 series 100,000 points per series 10,000,000,000

    total points 5,000 points per request c3.8xlarge, writes from 4 other systems ~390,000 points/sec ~3 bytes/point (random floats, could be better)