網(wǎng)上有很多關(guān)于pos機系統(tǒng)源碼,美團(tuán)leaf源碼解析的知識,也有很多人為大家解答關(guān)于pos機系統(tǒng)源碼的問題,今天pos機之家(m.nxzs9ef.cn)為大家整理了關(guān)于這方面的知識,讓我們一起來看下吧!
本文目錄一覽:
pos機系統(tǒng)源碼
leaf提供了兩種id生成方式一種是基于mysql分段雙buffer模式,一種是基于zookeeper的。
雪花id生成器美團(tuán)使用的是zookeeper實現(xiàn)的,zookeeper在此中間件中扮演的角色總結(jié)如下,用來存儲每個分布式節(jié)點的ip,port信息,主要是每天機器的時間戳,這個信息用于保障每個節(jié)點生成的id不會出現(xiàn)回?fù)墁F(xiàn)象,出現(xiàn)回?fù)艿脑蚍治鋈缦?/p>
41bit的計算方式是機器的當(dāng)前時間減去系統(tǒng)初始化的時間的時間戳,leaf給的是 :Thu Nov 04 2010 09:42:54 GMT+0800 (中國標(biāo)準(zhǔn)時間) 1288834974657L的差值填充41bit的空間,從id的組成我們可以知道10bit的工作機器id不同就能保障一定的程度id不同性,但是人如果41bit的位置出現(xiàn)時間回?fù)芎竽敲磫我粀orker生成的id就可能會出現(xiàn)重復(fù)的id。
SnowflakeZookeeperHolder
這個類主要是處理zk的連接,以及數(shù)據(jù)存儲策略,包括如果分配workerid,以及workerid也會存儲在每個節(jié)點的本地化文件中,從代碼中我們可以知道,每個zk的path是這樣的/snowflake/leafname/forever/ip:port-0000000001 后面的數(shù)字就是workerid由zk分配的
public class SnowflakeZookeeperHolder { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeZookeeperHolder.class); private String zk_Addressnode = null;//保存自身的key ip:port-000000001 private String listenAddress = null;//保存自身的key ip:port private int workerID; private static final String PREFIX_ZK_PATH = "/snowflake/" + PropertyFactory.getProperties().getProperty("leaf.name"); private static final String PROP_PATH = System.getProperty("java.io.tmpdir") + File.separator + PropertyFactory.getProperties().getProperty("leaf.name") + "/leafconf/{port}/workerID.properties"; private static final String PATH_FOREVER = PREFIX_ZK_PATH + "/forever";//保存所有數(shù)據(jù)持久的節(jié)點 private String ip; private String port; private String connectionString; private long lastUpdateTime; public SnowflakeZookeeperHolder(String ip, String port, String connectionString) { this.ip = ip; this.port = port; this.listenAddress = ip + ":" + port; this.connectionString = connectionString; } /** * 啟動應(yīng)用使用zk的curator客戶端連接zk,判斷l(xiāng)eaf指定業(yè)務(wù)的根節(jié)點是否存在,leaf中使用的劃分邏輯如下 * /snowflake/leafname/forever/ip:port-0000000001 * /snowflake/leafname(不同業(yè)務(wù)可以使用不同名字)訂單,用戶/forever/ip:port(這里指的是提供id生產(chǎn)服務(wù)的機器)-0000000001(順序節(jié)點序號) * 里面的內(nèi)容存的是endpoint內(nèi)容{"ip","xxx.xxx.xxx.xxx","port":"8080","timestamp":"timestamp"} * 先掃描業(yè)務(wù)目錄,如果業(yè)務(wù)目錄不存在那么說明第一次啟動這個業(yè)務(wù),因此當(dāng)前主機要把自己的信息保存進(jìn)去,默認(rèn)id為0 */ public boolean init() { try { CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000); curator.start(); Stat stat = curator.checkExists().forPath(PATH_FOREVER); if (stat == null) { //不存在根節(jié)點,機器第一次啟動,創(chuàng)建/snowflake/ip:port-000000000,并上傳數(shù)據(jù) zk_AddressNode = createNode(curator); //worker id 默認(rèn)是0 //guozc 潛在并發(fā)問題??兩個節(jié)點同時去創(chuàng)建節(jié)點都成功了,因為worker默認(rèn)是0可能會造成本地文件存儲的id為0,極端情況下? updateLocalWorkerID(workerID); //定時上報本機時間給forever節(jié)點 //定時任務(wù)每三秒鐘上報一下本機信息,里面關(guān)鍵信息是每次上報的時間戳,防止數(shù)顯時鐘回?fù)? ScheduledUploadData(curator, zk_AddressNode); return true; } else { //業(yè)務(wù)目錄存在那么就檢查是否有自己的節(jié)點 Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001 Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001) //存在根節(jié)點,先檢查是否有屬于自己的根節(jié)點 List<String> keys = curator.getChildren().forPath(PATH_FOREVER); for (String key : keys) { String[] nodeKey = key.split("-"); realNode.put(nodeKey[0], key); nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1])); } Integer workerid = nodeMap.get(listenAddress); if (workerid != null) { //有自己的節(jié)點,zk_AddressNode=ip:port zk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress); workerID = workerid;//啟動worder時使用會使用 if (!checkInitTimeStamp(curator, zk_AddressNode)) { throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time"); } //準(zhǔn)備創(chuàng)建臨時節(jié)點 doService(curator); updateLocalWorkerID(workerID); LOGGER.info("[Old NODE]find forever node have this endpoint ip-{} port-{} workid-{} childnode and start SUCCESS", ip, port, workerID); } else { //表示新啟動的節(jié)點,創(chuàng)建持久節(jié)點 ,不用check時間 String newNode = createNode(curator); zk_AddressNode = newNode; String[] nodeKey = newNode.split("-"); workerID = Integer.parseInt(nodeKey[1]); doService(curator); updateLocalWorkerID(workerID); LOGGER.info("[New NODE]can not find node on forever node that endpoint ip-{} port-{} workid-{},create own node on forever node and start SUCCESS ", ip, port, workerID); } } } catch (Exception e) { LOGGER.error("Start node ERROR {}", e); try { Properties properties = new Properties(); properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + "")))); workerID = Integer.valueOf(properties.getProperty("workerID")); LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID); } catch (Exception e1) { LOGGER.error("Read file error ", e1); return false; } } return true; } private void doService(CuratorFramework curator) { ScheduledUploadData(curator, zk_AddressNode);// /snowflake_forever/ip:port-000000001 } private void ScheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) { Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "schedule-upload-time"); thread.setDaemon(true); return thread; } }).scheduleWithFixedDelay(new Runnable() { @Override public void run() { updateNewData(curator, zk_AddressNode); } }, 1L, 3L, TimeUnit.SECONDS);//每3s上報數(shù)據(jù) } /** * 檢查zookeeper中數(shù)據(jù)是否小于當(dāng)前機器的系統(tǒng)時間,因為每個機器在zk中都有一個自己的節(jié)點用于存儲endpoint數(shù)據(jù) */ private boolean checkInitTimeStamp(CuratorFramework curator, String zk_AddressNode) throws Exception { byte[] bytes = curator.getData().forPath(zk_AddressNode); Endpoint endPoint = deBuildData(new String(bytes)); //該節(jié)點的時間不能小于最后一次上報的時間 return !(endPoint.getTimestamp() > System.currentTimeMillis()); } /** * 創(chuàng)建持久順序節(jié)點 ,并把節(jié)點數(shù)據(jù)放入 value * * @param curator * @return * @throws Exception */ private String createNode(CuratorFramework curator) throws Exception { try { return curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(PATH_FOREVER + "/" + listenAddress + "-", buildData().getBytes()); } catch (Exception e) { LOGGER.error("create node error msg {} ", e.getMessage()); throw e; } } private void updateNewData(CuratorFramework curator, String path) { try { if (System.currentTimeMillis() < lastUpdateTime) { return; } curator.setData().forPath(path, buildData().getBytes()); lastUpdateTime = System.currentTimeMillis(); } catch (Exception e) { LOGGER.info("update init data error path is {} error is {}", path, e); } } /** * 構(gòu)建需要上傳的數(shù)據(jù) * * @return */ private String buildData() throws JsonProcessingException { Endpoint endpoint = new Endpoint(ip, port, System.currentTimeMillis()); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(endpoint); return json; } /** * 將json字符串轉(zhuǎn)換為endpoint對象 */ private Endpoint deBuildData(String json) throws IOException { ObjectMapper mapper = new ObjectMapper(); Endpoint endpoint = mapper.readValue(json, Endpoint.class); return endpoint; } /** * 在節(jié)點文件系統(tǒng)上緩存一個workid值,zk失效,機器重啟時保證能夠正常啟動 * * @param workerID */ private void updateLocalWorkerID(int workerID) { File leafConfFile = new File(PROP_PATH.replace("{port}", port)); boolean exists = leafConfFile.exists(); LOGGER.info("file exists status is {}", exists); if (exists) { try { FileUtils.writeStringToFile(leafConfFile, "workerID=" + workerID, false); LOGGER.info("update file cache workerID is {}", workerID); } catch (IOException e) { LOGGER.error("update file cache error ", e); } } else { //不存在文件,父目錄頁肯定不存在 try { boolean mkdirs = leafConfFile.getParentFile().mkdirs(); LOGGER.info("init local file cache create parent dis status is {}, worker id is {}", mkdirs, workerID); if (mkdirs) { if (leafConfFile.createNewFile()) { FileUtils.writeStringToFile(leafConfFile, "workerID=" + workerID, false); LOGGER.info("local file cache workerID is {}", workerID); } } else { LOGGER.warn("create parent dir error==="); } } catch (IOException e) { LOGGER.warn("craete workerID conf file error", e); } } } private CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) { return CuratorFrameworkFactory.builder().connectString(connectionString) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs) .build(); } /** * 上報數(shù)據(jù)結(jié)構(gòu) */ static class Endpoint { private String ip; private String port; private long timestamp; public Endpoint() { } public Endpoint(String ip, String port, long timestamp) { this.ip = ip; this.port = port; this.timestamp = timestamp; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public String getPort() { return port; } public void setPort(String port) { this.port = port; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } } public String getZk_AddressNode() { return zk_AddressNode; } public void setZk_AddressNode(String zk_AddressNode) { this.zk_AddressNode = zk_AddressNode; } public String getListenAddress() { return listenAddress; } public void setListenAddress(String listenAddress) { this.listenAddress = listenAddress; } public int getWorkerID() { return workerID; } public void setWorkerID(int workerID) { this.workerID = workerID; } public static void main(String[] args) { try { System.out.println(Integer.parseInt("0000000008")); } catch (Exception e) { e.printStackTrace(); } }}
另外一個核心類是雪花的實現(xiàn)細(xì)節(jié),包括了一個id是如何生成的,還有就是時間是如何更新到zk的,還有就是人如果當(dāng)前毫秒中id分配不夠了leaf是如何處理的
/** * 使用位運算拼接一個long類型的id出來,主要是利用時間做高41位的內(nèi)容,中間是10bit的機器id,也基本夠用了,一個服務(wù)的id生成理論上不會超過1023個服務(wù)節(jié)點 * 最后的12bit用來做遞增 */public class SnowflakeIDGenImpl implements IDGen { @Override public boolean init() { return true; } private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeIDGenImpl.class); //開始時間戳 //個數(shù)字可以指定為任意小于當(dāng)前時間的數(shù)字,這樣就能讓競爭對手無法知道我們的id信息了,這就解決了基于mysql的segement方案造成的容易被競爭對手監(jiān)控的問題了,因為有時間維度的參與,對手不知道我們每時每刻的id發(fā)放信息 private final long twepoch; //wokerid占用的位數(shù) private final long workerIdBits = 10L; //worker最大的id1023 private final long maxWorkerId = ~(-1L << workerIdBits);//最大能夠分配的workerid =1023 //每毫秒的id數(shù)字最大值 private final long sequenceBits = 12L; //workerid的偏移量 private final long workerIdShift = sequenceBits; //時間戳位移數(shù) private final long timestampLeftShift = sequenceBits + workerIdBits; //每個毫秒生成id數(shù)的掩碼,用來進(jìn)行與運算提高運算效率,他讓自己的高位是1,其他都是0那么在與的時候如果沒滿則是sequence 如果是0說明與的那個數(shù)字二進(jìn)制后12位全是0了,也就是滿了,因此會休息一個死循環(huán)的時間然后繼續(xù)生成id private final long sequenceMask = ~(-1L << sequenceBits); //工作節(jié)點的id private long workerId; //每個毫秒自增id數(shù)字 private long sequence = 0L; //保存上次生成id時候的時間戳 private long lastTimestamp = -1L; //new一個隨機函數(shù)對象,多線程公用,并發(fā)性問題交給了synchronized關(guān)鍵字,并且公用對象后降低了new的成本 private static final Random RANDOM = new Random(); public SnowflakeIDGenImpl(String zkAddress, int port) { //Thu Nov 04 2010 09:42:54 GMT+0800 (中國標(biāo)準(zhǔn)時間) this(zkAddress, port, 1288834974657L); } /** * @param zkAddress zk地址 * @param port snowflake監(jiān)聽端口 * @param twepoch * * 初始化應(yīng)用,主要是SnowflakeZookeeperHolder 里面的初始化,包括節(jié)點的創(chuàng)建,或者數(shù)據(jù)的同步,還有主要是完成時間的檢查,方式工作節(jié)點始終回?fù)? * 并且將workerid進(jìn)行賦值,方便生成id時候使用 */ public SnowflakeIDGenImpl(String zkAddress, int port, long twepoch) { this.twepoch = twepoch; Preconditions.checkArgument(timeGen() > twepoch, "Snowflake not support twepoch gt currentTime"); final String ip = Utils.getIp(); SnowflakeZookeeperHolder holder = new SnowflakeZookeeperHolder(ip, String.valueOf(port), zkAddress); LOGGER.info("twepoch:{} ,ip:{} ,zkAddress:{} port:{}", twepoch, ip, zkAddress, port); boolean initFlag = holder.init(); if (initFlag) { workerId = holder.getWorkerID(); LOGGER.info("START SUCCESS USE ZK WORKERID-{}", workerId); } else { Preconditions.checkArgument(initFlag, "Snowflake Id Gen is not init ok"); } Preconditions.checkArgument(workerId >= 0 && workerId <= maxWorkerId, "workerID must gte 0 and lte 1023"); } /** * 核心方法這個就是獲得雪花id的方法,因為是按照時間軸進(jìn)行發(fā)布的,因此不存在不同的業(yè)務(wù)key的隔離,因為所有的業(yè)務(wù)的id都不會重復(fù),(就是這么的任性) * 先取到系統(tǒng)時間戳,然后跟對象中的 lastTimestamp比較如果系統(tǒng)時間比對象時間回?fù)芰?毫秒那么久稍作休息wait一下,也就是等待兩倍的毫秒數(shù),因為左移動1二進(jìn)制翻一倍, * 如果線程醒過來后還是有偏移量那么就返回錯誤。如果偏移量超過5毫秒,那么代表著偏移量太大,那么就返回錯誤, * 如果對象中的 lastTimestamp 與當(dāng)前機器中系統(tǒng)時間一樣,這里面說明一下,這種情況下肯定是比較高的并發(fā)情況下的必然了,因為每次發(fā)放id后對象時間都會被置為當(dāng)時取的系統(tǒng)時間 * 也就是一個毫秒中會發(fā)憷多個id,那么處理邏輯就是給 sequence不停的加一,這里面的與其實就是2的12次方-1,也就是整了個sequence的最大值,這樣出現(xiàn)0代表 sequence + 1變成了 * 2的12次方-1了,那么也就是意味著并發(fā)真的很大,一毫秒中的id被打光了,那么系統(tǒng)就調(diào)用 tilNextMillis 進(jìn)行死循環(huán)的等待,因為這種等待是毫秒級的,所以使用了while循環(huán) * 如果是新的毫秒是就生成一個隨機數(shù),作為sequence的新值,緊接著對lastTimestamp賦值,然后利用位運算生成一個long的id進(jìn)行返回 */ @Override public synchronized Result get(String key) { long timestamp = timeGen(); if (timestamp < lastTimestamp) { long offset = lastTimestamp - timestamp; if (offset <= 5) { try { wait(offset << 1); timestamp = timeGen(); if (timestamp < lastTimestamp) { return new Result(-1, Status.EXCEPTION); } } catch (InterruptedException e) { LOGGER.error("wait interrupted"); return new Result(-2, Status.EXCEPTION); } } else { return new Result(-3, Status.EXCEPTION); } } if (lastTimestamp == timestamp) { sequence = (sequence + 1) & sequenceMask; if (sequence == 0) { //seq 為0的時候表示是下一毫秒時間開始對seq做隨機 sequence = RANDOM.nextInt(100); timestamp = tilNextMillis(lastTimestamp); } } else { //如果是新的ms開始 sequence = RANDOM.nextInt(100); } lastTimestamp = timestamp; long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence; return new Result(id, Status.SUCCESS); } /** * 通過死循環(huán)的形式確保時間進(jìn)行了后移,因為最多也就是停留一毫秒,所以使用死循環(huán)的形式代價更低 */ protected long tilNextMillis(long lastTimestamp) { long timestamp = timeGen(); while (timestamp <= lastTimestamp) { timestamp = timeGen(); } return timestamp; } protected long timeGen() { return System.currentTimeMillis(); } public long getWorkerId() { return workerId; } public static void main(String[] args) { Date date = new Date("Mon 6 Jan 1997 13:3:00"); long id = ((System.currentTimeMillis()- date.getTime()) << 22L) | (10 << 12L) | (1) & ~(-1L << 12L); System.out.println(id); System.out.println(10 << 12L); System.out.println(Long.toBinaryString(id)); }}分段id生成器
美團(tuán)實現(xiàn)的分段id生成器使用的則是mysql數(shù)據(jù)庫完成數(shù)據(jù)的共享,并通過update語句加鎖完成并發(fā)控制。接下來分析一下分段id生成的核心代碼就一個類
package com.sankuai.inf.leaf.segment;import com.sankuai.inf.leaf.IDGen;import com.sankuai.inf.leaf.common.Result;import com.sankuai.inf.leaf.common.Status;import com.sankuai.inf.leaf.segment.dao.IDAllocDao;import com.sankuai.inf.leaf.segment.model.*;import org.perf4j.StopWatch;import org.perf4j.slf4j.Slf4JStopWatch;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.*;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicLong;public class SegmentIDGenImpl implements IDGen { private static final Logger logger = LoggerFactory.getLogger(SegmentIDGenImpl.class); /** * IDCache未初始化成功時的異常碼 */ private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1; /** * key不存在時的異常碼 */ private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2; /** * SegmentBuffer中的兩個Segment均未從DB中裝載時的異常碼 */ private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3; /** * 最大步長不超過100,0000 */ private static final int MAX_STEP = 1000000; /** * 一個Segment維持時間為15分鐘 * 這里指的是一個系統(tǒng)默認(rèn)認(rèn)為的合理時間,主要用于調(diào)整buffer里面步長的大小,如果當(dāng)前次更新距離上次更新時間超過15分鐘的話 * 那么步長就會動態(tài)調(diào)整為二分之一之前的長度,如果說當(dāng)次更新時間距離上次更新時間未超過15分鐘那么說明系統(tǒng)壓力大,那么就適當(dāng)調(diào)整步長到2倍直到最大步長 * MAX_STEP */ private static final long SEGMENT_DURATION = 15 * 60 * 1000L; /** * 用來更新本地的buffer的線程池,用來更新每個tag對應(yīng)的segmentBufferr里面?zhèn)溆玫膕egment */ private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory()); /** * 初始化狀態(tài),主要用來標(biāo)記mysql中tag是否初次被同步進(jìn)入內(nèi)存中 */ private volatile boolean initOK = false; /** * 用來保存每個tag對應(yīng)的segmentBuffer,業(yè)務(wù)通過tag進(jìn)行隔離,并且此處使用了并發(fā)安全的容器,主要是防止在刷新tag的時候出現(xiàn)線程不安全的問題 */ private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>(); /** * 主要是用來與mysql打交道,加載tag,加載step,更新maxid */ private IDAllocDao dao; /** * 作者比較優(yōu)秀,為了刷新線程起一個比較好聽的名字特意寫了個一個工廠,哈哈并且內(nèi)部做了一個線程計數(shù)的變量 */ public static class UpdateThreadFactory implements ThreadFactory { private static int threadInitNumber = 0; private static synchronized int nextThreadNum() { return threadInitNumber++; } @Override public Thread newThread(Runnable r) { return new Thread(r, "Thread-Segment-Update-" + nextThreadNum()); } } /** * 暴露給外部調(diào)用用來初始化分段id生成器的功能,主要包括更新所有的tag進(jìn)入到內(nèi)存中,并且啟動一個單線程的守護(hù)線程去做定時刷新這些tag的操作, * 間隔60秒,這里之所以用單線程的線程池我個人的判斷是為了充分利用阻塞的特性,因為在極端的情況下60秒加載不完那么就阻塞著在哪里,當(dāng)然,絕大多數(shù)業(yè)務(wù) * 一分鐘肯定是能夠加載完的。 */ @Override public boolean init() { logger.info("Init ..."); // 確保加載到kv后才初始化成功 updateCacheFromDb(); initOK = true; updateCacheFromDbAtEveryMinute(); return initOK; } /** * 刷新緩存的方法。單線程每隔60秒刷新一次tag,與mysql 同步一次tag的信息 */ private void updateCacheFromDbAtEveryMinute() { ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("check-idCache-thread"); t.setDaemon(true); return t; } }); service.scheduleWithFixedDelay(new Runnable() { @Override public void run() { updateCacheFromDb(); } }, 60, 60, TimeUnit.SECONDS); } /** * 從mysql中同步tag的信息進(jìn)入內(nèi)存中,這里作者做的也很巧妙,并不著急立馬就去加載segment我們看到SegmentBuffer里面有一個初始化是否成功的標(biāo)志字段 * initOk 他標(biāo)志著目前這個segmentBuffer是否可用,但是這個方法里面默認(rèn)是false的,作者在這里巧妙的利用了懶加載的方式,將max的值的更改延后,因為我們思考一種弄場景 * leaf在美團(tuán)中可能是全集團(tuán)公用的,可能部署了上百個節(jié)點,那么很有可能這些服務(wù)會面臨重啟,如果每次重啟都會默認(rèn)更新mysql的話,一方面會浪費非常多的step的id,另外一方面很有可能 * 就算浪費了id也可能會用不到,因此這里面用戶使用了懶加載的思想只是先進(jìn)行占位,當(dāng)用戶在真正使用的時候再去查詢并填充segment并更新mysql,因此這里面有個細(xì)節(jié)就是 * 如果系統(tǒng)極端在乎平滑性,那么在leaf在對外提供服務(wù)前,先手動調(diào)用一次,以確保segment被填充完善,降低延時性。 */ private void updateCacheFromDb() { logger.info("update cache from db"); StopWatch sw = new Slf4JStopWatch(); try { List<String> dbTags = dao.getAllTags(); if (dbTags == null || dbTags.isEmpty()) { return; } List<String> cacheTags = new ArrayList<String>(cache.keySet()); Set<String> insertTagsSet = new HashSet<>(dbTags); Set<String> removeTagsSet = new HashSet<>(cacheTags); //db中新加的tags灌進(jìn)cache for(int i = 0; i < cacheTags.size(); i++){ String tmp = cacheTags.get(i); if(insertTagsSet.contains(tmp)){ insertTagsSet.remove(tmp); } } for (String tag : insertTagsSet) { SegmentBuffer buffer = new SegmentBuffer(); buffer.setKey(tag); Segment segment = buffer.getCurrent(); segment.setValue(new AtomicLong(0)); segment.setMax(0); segment.setStep(0); cache.put(tag, buffer); logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer); } //cache中已失效的tags從cache刪除 for(int i = 0; i < dbTags.size(); i++){ String tmp = dbTags.get(i); if(removeTagsSet.contains(tmp)){ removeTagsSet.remove(tmp); } } for (String tag : removeTagsSet) { cache.remove(tag); logger.info("Remove tag {} from IdCache", tag); } } catch (Exception e) { logger.warn("update cache from db exception", e); } finally { sw.stop("updateCacheFromDb"); } } /** * 主力接口,用于對外界提供id * 判斷當(dāng)前tag緩存是否已經(jīng)就緒,如果未就緒直接報錯,因此要求調(diào)用方應(yīng)該先調(diào)用init(),進(jìn)行基礎(chǔ)環(huán)境的就緒 * 緩存就緒成功,從緩存中查看客戶端請求的key是否存在,不存在的可能有兩種,一種是mysql中沒有,這個需要等大概60秒才會刷新,因此在leaf使用過程中應(yīng)該提前就緒好mysql,讓讓多個leaf服務(wù)都能刷新到相應(yīng)的key * 另外一種可能就是mysql中也沒有,當(dāng)然也會造成cache中沒有,兩種情況造成的緩存中沒有,系統(tǒng)都會返回key不存在,id生成失敗 * 如果緩存中也恰好查到了有key,那么就會因為懶加載的原因造成可能segmentBuffer沒有初始化,(任何事情都有兩面性) * 我們看到美團(tuán)的處理方式是通過鎖定對應(yīng)的segmentBuffer的對象頭,可以說也是無所不用其極的減低鎖粒度,不得不說一句nice * 另外我們看到使用了雙重檢查,防止并發(fā)問題,這里多啰嗦一句為什么回出現(xiàn)并發(fā)問題,兩個線程都到了synchronized的臨界區(qū)后,一個線程拿到了buffer的頭鎖,進(jìn)入可能去更新mysql了,如果他執(zhí)行完他會放開頭鎖 * 但是如果不通過判斷那么他也會繼續(xù)執(zhí)行更新mysql的操作,因此造成不滿足我們預(yù)期的事情發(fā)生了,所以這里通過一個initok的一個標(biāo)志進(jìn)行雙重判定,那么就算是第二個線程進(jìn)入后因為第一個線程退出前就更新了linitok為true * 所以第二個線程進(jìn)來后還是不能更新mysql就安全出去臨界區(qū)了。 * */ @Override public Result get(final String key) { if (!initOK) { return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION); } if (cache.containsKey(key)) { SegmentBuffer buffer = cache.get(key); if (!buffer.isInitOk()) { synchronized (buffer) { if (!buffer.isInitOk()) { try { updateSegmentFromDb(key, buffer.getCurrent()); logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent()); buffer.setInitOk(true); } catch (Exception e) { logger.warn("Init buffer {} exception", buffer.getCurrent(), e); } } } } return getIdFromSegmentBuffer(cache.get(key)); } return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION); } /** * 這個方法主要是用來更新并填充好,指定key對應(yīng)的SegmentBuffer * StopWatch 是一個計時器,作者考慮到這個方法的性能問題,因此加了一個監(jiān)控 * 先是判斷指定的segmentBuffer是否初始化完成,如果沒有初始化完成也就是說沒有向數(shù)據(jù)庫去申請id段,那么就去取申請并填充進(jìn)segmentBuffer * 如果是已經(jīng)初始化完成了,第二個分支其實特定指的是第二次申請 */ public void updateSegmentFromDb(String key, Segment segment) { StopWatch sw = new Slf4JStopWatch(); SegmentBuffer buffer = segment.getBuffer(); LeafAlloc leafAlloc; //第一次申請id段 if (!buffer.isInitOk()) { leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); buffer.setStep(leafAlloc.getStep()); buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step } else if (buffer.getUpdateTimestamp() == 0) { //第二次申請id段,因為之前的第一次申請動作談不上更新,因此在第二次的時候?qū)⒏聲r間進(jìn)行填充 leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); buffer.setUpdateTimestamp(System.currentTimeMillis()); buffer.setStep(leafAlloc.getStep()); buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step } else { //第N次申請id段動態(tài)優(yōu)化步長,我們看到有一個指定的時間以15分鐘為例,如果兩次領(lǐng)取間隔少于15分鐘那么就將step拉大一倍,但是不會超過系統(tǒng)默認(rèn)的10W的step // 這樣做的好處其實也是降低mysql壓力 //如果兩次申請的超過30分鐘那么就將步長調(diào)整為原來的一半,但是不會小于最小步長 long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp(); int nextStep = buffer.getStep(); if (duration < SEGMENT_DURATION) { if (nextStep * 2 > MAX_STEP) { //do nothing } else { nextStep = nextStep * 2; } } else if (duration < SEGMENT_DURATION * 2) { //do nothing with nextStep } else { nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep; } logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep); LeafAlloc temp = new LeafAlloc(); temp.setKey(key); temp.setStep(nextStep); leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp); buffer.setUpdateTimestamp(System.currentTimeMillis()); buffer.setStep(nextStep); buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step為DB中的step } // must set value before set max /** * 此處很坑,這是第一版本留下的無效注釋 * https://github.com/Meituan-Dianping/Leaf/issues/16 * 可以不用強制要求的,因為是單線程更新,并且buffer還沒有就緒因此不存在優(yōu)先可見的問題 */ long value = leafAlloc.getMaxId() - buffer.getStep(); segment.getValue().set(value); segment.setMax(leafAlloc.getMaxId()); segment.setStep(buffer.getStep()); sw.stop("updateSegmentFromDb", key + " " + segment); } /** * 核心處理方法 * 通過讀寫鎖提升并發(fā),讀鎖主要負(fù)責(zé)id的自增,但是如果只是自增那么靠automic操作就夠,因此還涉及到segment的切換,因此此處使用了讀寫鎖進(jìn)行分離 * 當(dāng)需要切換segment的時候讀鎖也會被掛起來,因為如果不掛起的話會出現(xiàn)臟讀。 * 方法的核心思想總結(jié)如下 * 通過while循環(huán),死循環(huán)的去取數(shù)據(jù),先是拿到讀鎖,此處總結(jié)一下 JUC包里面的讀寫鎖的特性,讀讀可并行,讀寫不可并行,寫寫不可并行。 * 在這里從概念上先完成梳理 * 1、備用buffer的更新是由單線程完成的,這里面是通過cas更新ThreadRunning實現(xiàn)的,因此備用buffer的更新是安全的 * 2、id的自增是通過AutomicLong實現(xiàn)的因此也不存在自增時候的線程安全問題 * 3、主備buffer的切換是由讀寫鎖來進(jìn)行控制的,讀鎖生效時候時候要么能夠自增成功則返回,要么自增不成功,線程開始搶寫鎖,如果搶上,那么新來的讀鎖請求就會被掛起, * 直到寫鎖完成buffer的切換,然后通過while循環(huán)自增后返回id */ public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) { while (true) { buffer.rLock().lock(); try { final Segment segment = buffer.getCurrent(); if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) { service.execute(new Runnable() { @Override public void run() { Segment next = buffer.getSegments()[buffer.nextPos()]; boolean updateOk = false; try { updateSegmentFromDb(buffer.getKey(), next); updateOk = true; logger.info("update segment {} from db {}", buffer.getKey(), next); } catch (Exception e) { logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e); } finally { if (updateOk) { buffer.wLock().lock(); buffer.setNextReady(true); buffer.getThreadRunning().set(false); buffer.wLock().unlock(); } else { buffer.getThreadRunning().set(false); } } } }); } long value = segment.getValue().getAndIncrement(); if (value < segment.getMax()) { return new Result(value, Status.SUCCESS); } } finally { buffer.rLock().unlock(); } waitAndSleep(buffer); buffer.wLock().lock(); try { //這里進(jìn)行這么判斷是因為可能有多個寫鎖排隊在這里,一個寫鎖更新成了后,那么后面的線程直接取就好,不需要走后續(xù)的修改操作了。 final Segment segment = buffer.getCurrent(); long value = segment.getValue().getAndIncrement(); if (value < segment.getMax()) { return new Result(value, Status.SUCCESS); } //檢查備用buffer是否完成了準(zhǔn)備,如果準(zhǔn)備完成則進(jìn)行切換,如果未準(zhǔn)備完成則拋出異常代表主buffer還有從buffer都沒有準(zhǔn)備好,系統(tǒng)暫時不可用。 //產(chǎn)生的原因可能是刷新線程池阻塞,這可能性還是蠻小的,這也是為什么中間件作者在 update代碼段加入 stopwatch監(jiān)控的原因吧。 if (buffer.isNextReady()) { buffer.switchPos(); buffer.setNextReady(false); } else { logger.error("Both two segments in {} are not ready!", buffer); return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION); } } finally { buffer.wLock().unlock(); } } } /** * 讓當(dāng)前線程進(jìn)入死循環(huán)等待,為了降低無效的cpu輪訓(xùn)如果循環(huán)次數(shù)超過一萬后就休眠10ms */ private void waitAndSleep(SegmentBuffer buffer) { int roll = 0; while (buffer.getThreadRunning().get()) { roll += 1; if(roll > 10000) { try { TimeUnit.MILLISECONDS.sleep(10); break; } catch (InterruptedException e) { logger.warn("Thread {} Interrupted",Thread.currentThread().getName()); break; } } } } public List<LeafAlloc> getAllLeafAllocs() { return dao.getAllLeafAllocs(); } public Map<String, SegmentBuffer> getCache() { return cache; } public IDAllocDao getDao() { return dao; } public void setDao(IDAllocDao dao) { this.dao = dao; }}
以上就是關(guān)于pos機系統(tǒng)源碼,美團(tuán)leaf源碼解析的知識,后面我們會繼續(xù)為大家整理關(guān)于pos機系統(tǒng)源碼的知識,希望能夠幫助到大家!









