loadDataHelper.cs 7.02 KB
using Google.Cloud.Bigtable.V2;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using HdrHistogram;
using System.Threading.Tasks;
using Google.Api.Gax.Grpc;

namespace Google.Cloud.Bigtable.ScanTest
{
    class loadDataHelper
    {
        private AppSettings _appSettings;
        private V2.TableName _table;
        private BigtableClient _client;
        private Random _rnd = new Random();

        public loadDataHelper(V2.TableName _tableName)
        {
            _appSettings = Program.AppSetting;
            _table = _tableName;
            _client = BigtableClient.Create();
            Program.IsDataLoaded = true;
        }

        public void loadTable()
        {
            //List<Thread> threads = new List<Thread>();

            Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} - Loading {_appSettings.Records} records into table {_appSettings.TableName}. Threads: {_appSettings.LoadThreads}. Cluster: {_appSettings.InstanceId}");

            try
            {
                var tasks = new List<Task<int>>();

                long recordsPerThread = _appSettings.Records / _appSettings.LoadThreads;
                long reminder = _appSettings.Records % _appSettings.LoadThreads;

                //initialize load histogram again
                //_loadHistogram = new LongConcurrentHistogram(3, TimeStamp.Hours(1), 3);

                for (int threadCount = 0; threadCount < _appSettings.LoadThreads; threadCount++)
                {
                    int _position = threadCount;
                    if (threadCount == _appSettings.LoadThreads - 1)
                    {
                        //threads.Add(new Thread(() => processLoad(_position, recordsPerThread, recordsPerThread + reminder)));
                        string _input = _position.ToString() + "/" + recordsPerThread.ToString() + "/" + (recordsPerThread + reminder).ToString();

                        tasks.Add(Task<int>.Factory.StartNew(processLoad, _input));

                    }
                    else
                    {
                       // threads.Add(new Thread(() => processLoad(_position, recordsPerThread, recordsPerThread)));
                        string _input = _position.ToString() + "/" + recordsPerThread.ToString() + "/" + recordsPerThread.ToString();
                        tasks.Add(Task<int>.Factory.StartNew(processLoad, _input));
                    }
                }
                Task.WaitAll(tasks.ToArray());

                //threads.ForEach(a => a.Start());
                //threads.ForEach(a => a.Join());
                //threads.Clear();
                //threads = null;
                tasks.ForEach(a => a.Dispose());

                tasks.Clear();
                
                Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} - Data loaded successfully into table {_appSettings.TableName}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} - Exception while loading data, error message: {ex.Message}");
            }
        }

        private int processLoad(object input)
        { 
            try
            {
                string[] _array = input.ToString().Split('/');
                int threadNumber = Convert.ToInt32(_array[0]);
                long startPosition = Convert.ToInt64(_array[1]);
                long totalRecords = Convert.ToInt64(_array[2]);

                int recordCount = 0;
                MutateRowsRequest request = new MutateRowsRequest() { TableNameAsTableName = _table };

                MutateRowsRequest.Types.Entry entry = new MutateRowsRequest.Types.Entry();

                // iterating through number of records assigned to a thread
                long recordNumber;
                for (int insertCount = 0; insertCount < totalRecords; insertCount++)
                {
                    recordCount++;
                    entry = new MutateRowsRequest.Types.Entry();
                    entry.Mutations.Add(MutationsBuilder());

                    // building a batch of defined size
                    recordNumber = threadNumber * startPosition + insertCount;

                    StringBuilder sbRowKey = new StringBuilder(_appSettings.RowKeyPrefix + recordNumber.ToString("D" + _appSettings.RowKeySize));

                    entry.RowKey = Protobuf.ByteString.CopyFromUtf8(sbRowKey.ToString());
                    request.Entries.Add(entry);

                    if (recordCount == _appSettings.BatchSize || insertCount == totalRecords - 1)
                    {
                        long startTime = Stopwatch.GetTimestamp();
                        long responseTime;
                        recordCount = 0;

                        try
                        {
#if DEBUG
                            Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} - Thread: {threadNumber + 1} Inserting record {insertCount + 1}.  Last rowkey {sbRowKey.ToString()} for table {_appSettings.TableName}");
#endif                            
                            BigtableClient.MutateRowsStream response = _client.MutateRows(request
                                , CallSettings.FromCallTiming(CallTiming.FromTimeout(TimeSpan.FromMilliseconds(_appSettings.MutateRowTimeOutInMilliSeconds))));
                           
                            response.GrpcCall.ResponseHeadersAsync.Wait();
                        }
                        catch (Exception ex)
                        {
#if DEBUG
                            Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} - Exception inserting record {insertCount + 1}. Last rowkey {sbRowKey.ToString()} for table {_appSettings.TableName}");
#endif
                        }

                        responseTime = (Stopwatch.GetTimestamp() - startTime) / (Stopwatch.Frequency / 100000);
                        histogramHelper.LoadHistogram.RecordValue(responseTime);

                        request = new MutateRowsRequest
                        {
                            TableNameAsTableName = _table
                        };
                    }
                }
            }
            catch (Exception ex)
            {
#if DEBUG
                Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} Exception to load data for table {_appSettings.TableName}. Error Message: {ex.Message}");
#endif
            }
            return 0;
        }

        // Constructing mutation for 1 row 
        private Mutation[] MutationsBuilder()
        {
            Mutation[] _columns = new Mutation[_appSettings.Columns];

            for (int cellCount = 0; cellCount < _columns.Length; cellCount++)
            {
                _columns[cellCount] = Mutations.SetCell(_appSettings.ColumnFamily, $"{_appSettings.ColumnPrefix}{cellCount}", new BigtableByteString(RandomData()));
            }

            return _columns;
        }

        private Byte[] RandomData()
        {
            Byte[] b = new Byte[_appSettings.ColumnLength];
            _rnd.NextBytes(b);
            return b;
        }

        
    }
}