之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个
思路:
网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是和上一次同步开始时间比较,这样不管数据多少,更新耗时多少都不会少数据,因为原则是同步不漏掉任何数据,也可以程序多开将时间差和间隔时间差异化,因为用mysql中一个id当作es中的id,也避免了重复数据
使用:
只需要按照escongif.py写配置文件,然后写sql文件,最后直接执行mstes.py就可以了,我这个也是参考logstash-input-jdbc的配置形式
MsToEs
|----esconfig.py(配置文件)
|----mstes.py(同步程序)
|----sql_manage.py(数据库管理)
|----aa.sql(需要用到sql文件)
|----bb.sql(需要用到sql文件)
sql_manage.py:
# -*-coding:utf-8 -*- __author__ = "ZJL" from sqlalchemy.pool import QueuePool from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session import traceback import esconfig # 用于不需要回滚和提交的操作 def find(func): def wrapper(self, *args, **kwargs): try: return func(self, *args, **kwargs) except Exception as e: print(traceback.format_exc()) print(str(e)) return traceback.format_exc() finally: self.session.close() return wrapper class MysqlManager(object): def __init__(self): mysql_connection_string = esconfig.mysql.get("mysql_connection_string") self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'"htmlcode">select CONVERT(c.`id`,CHAR) as id, c.`code` as code, c.`project_name` as project_name, c.`name` as name, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, from `cc` c where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';bb.sql:
select CONVERT(c.`id`,CHAR) as id, CONVERT(c.`age`,CHAR) as age, c.`code` as code, c.`name` as name, c.`project_name` as project_name, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, from `bb` c where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';esconfig.py:
# -*- coding: utf-8 -*- #__author__="ZJL" # sql 文件名与es中的type名一致 mysql = { # mysql连接信息 "mysql_connection_string": "root:123456@127.0.0.1:3306/xxx", # sql文件信息 "statement_filespath":[ # sql对应的es索引和es类型 { "index":"a1", "sqlfile":"aa.sql", "type":"aa" }, { "index":"a1", "sqlfile":"bb.sql", "type":"bb" }, ], } # es的ip和端口 elasticsearch = { "hosts":"127.0.0.1:9200", } # 字段顺序与sql文件字段顺序一致,这是存进es中的字段名,这里用es的type名作为标识 db_field = { "aa": ("id", "code", "name", "project_name", "update_time", ), "bb": ("id", "code", "age", "project_name", "name", "update_time", ), } es_config = { # 间隔多少秒同步一次 "sleep_time":10, # 为了解决服务器之间时间差问题 "time_difference":3, # show_json 用来展示导入的json格式数据, "show_json":False, }mstes.py:
# -*- coding: utf-8 -*- #__author__="ZJL" from sql_manage import MysqlManager from esconfig import mysql,elasticsearch,db_field,es_config from elasticsearch import Elasticsearch from elasticsearch import helpers import traceback import time class TongBu(object): def __init__(self): try: # 是否展示json数据在控制台 self.show_json = es_config.get("show_json") # 间隔多少秒同步一次 self.sleep_time = es_config.get("sleep_time") # 为了解决同步时数据更新产生的误差 self.time_difference = es_config.get("time_difference") # 当前时间,留有后用 self.datetime_now = "" # es的ip和端口 es_host = elasticsearch.get("hosts") # 连接es self.es = Elasticsearch(es_host) # 连接mysql self.mm = MysqlManager() except : print(traceback.format_exc()) def tongbu_es_mm(self): try: # 同步开始时间 start_time = time.time() print("start..............",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))) # 这个list用于批量插入es actions = [] # 获得所有sql文件list statement_filespath = mysql.get("statement_filespath",[]) if self.datetime_now: # 当前时间加上时间差(间隔时间加上执行同步用掉的时间,等于上一次同步开始时间)再字符串格式化 # sql中格式化时间时年月日和时分秒之间不能空格,不然导入es时报解析错误,所以这里的时间格式化也统一中间加一个T self.datetime_now = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time()-(self.sleep_time+self.time_difference))) else: self.datetime_now = "1999-01-01T00:00:00" if statement_filespath: for filepath in statement_filespath: # sql文件 sqlfile = filepath.get("sqlfile") # es的索引 es_index = filepath.get("index") # es的type es_type = filepath.get("type") # 读取sql文件内容 with open(sqlfile,"r") as opf: sqldatas = opf.read() # ::datetime_now是一个自定义的特殊字符串用于增量更新 if "::datetime_now" in sqldatas: sqldatas = sqldatas.replace("::datetime_now",self.datetime_now) else: sqldatas = sqldatas # es和sql字段的映射 dict_set = db_field.get(es_type) # 访问mysql,得到一个list,元素都是字典,键是字段名,值是数据 db_data_list = self.mm.select_all_dict(sqldatas, dict_set) if db_data_list: # 将数据拼装成es的格式 for db_data in db_data_list: action = { "_index": es_index, "_type": es_type, "@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time())), "_source": db_data } # 如果没有id字段就自动生成 es_id = db_data.get("id", "") if es_id: action["_id"] = es_id # 是否显示json再终端 if self.show_json: print(action) # 将拼装好的数据放进list中 actions.append(action) # list不为空就批量插入数据到es中 if len(actions) > 0 : helpers.bulk(self.es, actions) except Exception as e: print(traceback.format_exc()) else: end_time = time.time() print("end...................",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))) self.time_difference = end_time-start_time finally: # 报错就关闭数据库 self.mm.close() def main(): tb = TongBu() # 间隔多少秒同步一次 sleep_time = tb.sleep_time # 死循环执行导入数据,加上时间间隔 while True: tb.tongbu_es_mm() time.sleep(sleep_time) if __name__ == '__main__': main()以上这篇用python简单实现mysql数据同步到ElasticSearch的教程就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新日志
- 玩家分享《黑神话:悟空》大怨种:想痛快玩游戏花了快400块
- 作者回应《黑神话》墙绘被毁:当地相关部门支持重绘
- 李泉.1995-上海梦【魔岩】【WAV+CUE】
- 何雨雯.1994-给你的歌【天王唱片】【WAV+CUE】
- 群星.1994-神摇第一章·极乐扬州路【D.I.Y】【WAV+CUE】
- 《车烧友 发烧情歌天碟3CD》[WAV/分轨][1.9GB]
- 《薛之谦 情歌不变招牌 霸气情歌2CD》[WAV/分轨][1.2GB]
- 《刘若英 滚石SACD精选》[ISO][1GB]
- 仙境传说新启航牧师怎么加点 牧师属性技能加点推荐
- 仙境传说新启航舞娘怎么加点 舞娘属性技能加点推荐
- 魔兽世界祖尔格拉布在哪 魔兽世界祖尔格拉布位置介绍
- 群星《名列前茅五大顶级发烧男声》2CD[DTS-WAV]
- 群星《经典再现·国语女声》2CD[DTS-WAV]
- 群星《十大民歌天后》2CD[DTS-WAV分轨]
- 《黑神话:悟空》浮屠牢是最糟糕的部分:但也是做得最好的