scanHelper.cs
4.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
using Google.Cloud.Bigtable.V2;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
namespace Google.Cloud.Bigtable.ScanTest
{
class scanHelper
{
private AppSettings _appSettings;
private V2.TableName _table;
private BigtableClient _client;
public scanHelper(V2.TableName tbl)
{
_appSettings = Program.AppSetting;
_table = tbl;
_client = BigtableClient.Create();
}
public async Task scan(int batchLimit)
{
Stopwatch _startTime = Stopwatch.StartNew();
long _responseTime = 0;
Stopwatch _startRead = Stopwatch.StartNew();
try
{
Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} - Table {_appSettings.TableName} read starting for batchsize {batchLimit}.");
long _scannedRecordInBatch = 0;
while (Program.ScanTime < _appSettings.ScanInterval)
{
_scannedRecordInBatch = 0;
ReadRowsRequest _readRequest = getReadRequest(batchLimit);
_startRead.Restart();
var response = _client.ReadRows(_readRequest);
++Program.ScanCount;
//response.GrpcCall.ResponseHeadersAsync.Wait();
while (await response.ResponseStream.MoveNext(default))
{
var current = response.ResponseStream.Current;
for (int index = 0; index < current.Chunks.Count;)
{
++Program.RecordScanned;
++_scannedRecordInBatch;
++index;
while (current.Chunks[index].RowKey.Length == 0)
{
++index;
if (index >= current.Chunks.Count)
break;
}
}
}
_startRead.Stop();
_responseTime = _startRead.ElapsedMilliseconds;
long _currentScanThroughput = _scannedRecordInBatch / _responseTime ;
histogramHelper.ThroughputHistogram.RecordValue(_currentScanThroughput);
histogramHelper.ScanResponseHistogram.RecordValue(_startRead.ElapsedMilliseconds);
#if DEBUG
//Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} - Recently scanned {_scannedRecordInBatch} in {_responseTime} milliseconds.");
#endif
Program.ThroughputTime += _responseTime;
}
}
catch (Exception ex)
{
_startTime.Stop();
Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} - Exception during scan. Performed {Program.ScanCount} scans consuming {Program.RecordScanned} rows in {_startTime.ElapsedMilliseconds} milliseconds., error message {ex.Message}");
}
_startTime.Stop();
Program.ThroughputTime = _startTime.ElapsedMilliseconds;
}
private ReadRowsRequest getReadRequest(int limit)
{
Random rand = new Random();
int _startPosition = rand.Next(0, (int)_appSettings.Records);
StringBuilder sbStartRowKey = new StringBuilder(_appSettings.RowKeyPrefix + _startPosition.ToString("D" + _appSettings.RowKeySize));
//StringBuilder sbEndRowKey = new StringBuilder(_appSettings.RowKeyPrefix + (_startPosition + _appSettings.ScanLimit - 1).ToString("D" + _appSettings.RowKeySize));
Google.Protobuf.ByteString startRowKey = Google.Protobuf.ByteString.CopyFromUtf8(sbStartRowKey.ToString());
// Google.Protobuf.ByteString endRowKey = Google.Protobuf.ByteString.CopyFromUtf8(sbEndRowKey.ToString());
RowFilter _rowFilter = new RowFilter();
_rowFilter.FamilyNameRegexFilter = _appSettings.ColumnFamily;
_rowFilter.CellsPerColumnLimitFilter = 1;
List<RowRange> _rowRanges = new List<RowRange>();
_rowRanges.Add(new RowRange() { StartKeyClosed = startRowKey }); //, EndKeyClosed = endRowKey
ReadRowsRequest _readRequest = new ReadRowsRequest()
{
TableNameAsTableName = _table
,
Rows = new RowSet { RowRanges = { _rowRanges[0] } }
};
_readRequest.RowsLimit = limit;
_readRequest.Filter = _rowFilter;
return _readRequest;
}
}
}