scanHelper.cs 4.67 KB
using Google.Cloud.Bigtable.V2;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;

namespace Google.Cloud.Bigtable.ScanTest
{
    class scanHelper
    {
        private AppSettings _appSettings;
        private V2.TableName _table;
        private BigtableClient _client;

        public scanHelper(V2.TableName tbl)
        {
            _appSettings = Program.AppSetting;
            _table = tbl;
            _client = BigtableClient.Create();
        }

        public async Task scan(int batchLimit)
        {   
            Stopwatch _startTime = Stopwatch.StartNew();
            long _responseTime = 0;

            Stopwatch _startRead = Stopwatch.StartNew();            

            try
            {
                Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} - Table {_appSettings.TableName} read starting for batchsize {batchLimit}.");

                long _scannedRecordInBatch = 0;


                while (Program.ScanTime < _appSettings.ScanInterval)
                {
                    _scannedRecordInBatch = 0;
                    ReadRowsRequest _readRequest = getReadRequest(batchLimit);

                    _startRead.Restart();
                    var response = _client.ReadRows(_readRequest);
                    ++Program.ScanCount;

                    //response.GrpcCall.ResponseHeadersAsync.Wait();

                    while (await response.ResponseStream.MoveNext(default))
                    {
                        var current = response.ResponseStream.Current;
                        for (int index = 0; index < current.Chunks.Count;)
                        {
                            ++Program.RecordScanned;
                            ++_scannedRecordInBatch;
                            ++index;
                            while (current.Chunks[index].RowKey.Length == 0)
                            {
                                ++index;
                                if (index >= current.Chunks.Count)
                                    break;
                            }
                        }
                    }

                    _startRead.Stop();

                    _responseTime = _startRead.ElapsedMilliseconds;

                    long _currentScanThroughput = _scannedRecordInBatch / _responseTime ;
                    histogramHelper.ThroughputHistogram.RecordValue(_currentScanThroughput);                    
                    histogramHelper.ScanResponseHistogram.RecordValue(_startRead.ElapsedMilliseconds);

#if DEBUG
                    //Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} - Recently scanned {_scannedRecordInBatch} in {_responseTime} milliseconds.");
#endif

                    Program.ThroughputTime += _responseTime;
                }
            }
            catch (Exception ex)
            {
                _startTime.Stop();

                Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} - Exception during scan. Performed {Program.ScanCount} scans consuming {Program.RecordScanned} rows in {_startTime.ElapsedMilliseconds} milliseconds., error message {ex.Message}");
            }
            _startTime.Stop();
            Program.ThroughputTime = _startTime.ElapsedMilliseconds;
        }

        private ReadRowsRequest getReadRequest(int limit)
        {
            Random rand = new Random();
            int _startPosition = rand.Next(0, (int)_appSettings.Records);

            StringBuilder sbStartRowKey = new StringBuilder(_appSettings.RowKeyPrefix + _startPosition.ToString("D" + _appSettings.RowKeySize));
            //StringBuilder sbEndRowKey = new StringBuilder(_appSettings.RowKeyPrefix + (_startPosition + _appSettings.ScanLimit - 1).ToString("D" + _appSettings.RowKeySize));

            Google.Protobuf.ByteString startRowKey = Google.Protobuf.ByteString.CopyFromUtf8(sbStartRowKey.ToString());
            // Google.Protobuf.ByteString endRowKey = Google.Protobuf.ByteString.CopyFromUtf8(sbEndRowKey.ToString());

            RowFilter _rowFilter = new RowFilter();
            _rowFilter.FamilyNameRegexFilter = _appSettings.ColumnFamily;
            _rowFilter.CellsPerColumnLimitFilter = 1;

            List<RowRange> _rowRanges = new List<RowRange>();
            _rowRanges.Add(new RowRange() { StartKeyClosed = startRowKey });  //, EndKeyClosed = endRowKey

            ReadRowsRequest _readRequest = new ReadRowsRequest()
            {
                TableNameAsTableName = _table
                ,
                Rows = new RowSet { RowRanges = { _rowRanges[0] } }
            };

            _readRequest.RowsLimit = limit;
            _readRequest.Filter = _rowFilter;

            return _readRequest;
        }

    }
}