irpas技术客

node.js连接elasticsearch并增删索引/批量导入_我想找个厂上班_node连接elasticsearch

未知 8302

参考:🚀 ElasticSearch, GraphQL, React, Node+Express🤯! How?https://medium.com/@andrewrymaruk/elasticsearch-graphql-react-node-express-how-cb2c2cc708f8

官方文档https://·/package/@elastic/elasticsearch

以下代码除了数据都写在一个js里。

安装 npm i @elastic/elasticsearch 创建client // .../elasticSearch.js (随便创建一个js) import { Client } from '@elastic/elasticsearch'; const client = new Client({ node: elconf.url,// 'http://localhost:9200' requestTimeout: 60000, }) 判断索引存在 // .../elasticSearch.js (随便创建一个js) export const elIndexExist = async (indexName) => { const existIndex = await client.indices.exists({ index: indexName }); return existIndex.body === true; } 创建索引

不自己判断存在的话,一旦重复创建es会给报一长串error,说我程序crash,头大

// .../elasticSearch.js (随便创建一个js) export const elCreateIndexes = async (indexToCreate = [elconf.index]) => { return Promise.all( indexToCreate.map((index) => { const indexName = `${index}_${new Date().getTime()}`;// 'test_user' // 判断index是否存在 return client.indices.exists({ index: indexName }).then((result) => { if (result === false) { // 创建 return client.indices.create({ index: indexName, body: { aliases: { [index]: {} } } }).catch((e) => { throw DatabaseError('Error 创建索引', { error: e }); }); } /* istanbul ignore next */ return result; }); }) ); }; 删除索引 // .../elasticSearch.js (随便创建一个js) export const elDeleteIndexes = async (indexesToDelete) => { Promise.all( indexesToDelete.map((index) => { const indexName = `${elconf.index_prefix}_${index}`;// 'test_user' return client.indices.delete({ index: indexName }).catch((err) => { console.log(`[ELASTICSEARCH] 删除索引失败`, { error: err }); }) }) ) return true; } 批量操作(批量导入数据)

通过json文件批量导入数据

1.通过import导入

// .../userdata.js // 实际文件中没有注释行 { "userdata": [ { "id": 1, "name": "jon", "password": "123456" }, { "id": 2, "name": "jon", "password": "123456" }, { "id": 3, "name": "jon", "password": "123456" }, { "id": 4, "name": "jon", "password": "23456" }, { "id": 5, "name": "test", "password": "123456" } ] }

我是es6在package.json写了?"type": "module",所以不能直接require,也不能import导入。

如果有此问题,可以参考以下链接。解决参考https://blog.csdn.net/weixin_43343144/article/details/121456830

?导入代码,让测试数据中的id作为elasticsearch的_id属性

// .../elasticSearch.js (随便创建一个js) // 测试数据 import USERDATAS from '../data/userdata.json' export const elBulk = async (index) => { let operations = USERDATAS.userdata.flatMap(doc => [{ index: { _index: index, _id: doc.id } }, doc])// 官方例子 const args = { refresh: true, operations }; client .bulk(args) .then((result) => { if (result.errors) { throw DatabaseError('Error 批量处理索引数据', { errors }); } return result; }) .catch((e) => { throw DatabaseError('Error 更新elastic', { error: e }) }) return true; }

2.通过fs读取文件导入

我不是很懂nodejs异步问题,debug的时候发现fs.readFile不是按顺序执行,但是对Promise和回调不熟悉。

因此最后选择使用fs/promise完成这个功能(它是用promise方式封装了异步)

读取文件转为json对象的函数:

// .../elasticSearch.js (随便创建一个js) import fs from 'fs/promises'; const readJsonAsync = async (fileName) => { let data = await fs.readFile(fileName); const fileData = JSON.parse(data.toString()); console.log(fileData) return fileData }

调用:

// .../elasticSearch.js (随便创建一个js) export const elBulk = async (index) => { let USERDATAS = await readJsonAsync('./src/data/userdata.json'); // 包装数据 let operations = USERDATAS.objects.flatMap(doc => [{ index: { _index: index, _id: doc.id } }, doc]) const args = { refresh: true, operations }; // 导入 client .bulk(args) .then((result) => { if (result.errors) { throw DatabaseError('Error 批量处理索引数据', { errors }); } return result; }) .catch((e) => { throw DatabaseError('Error 更新elastic', { error: e }) }) return true; }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #参考 #ElasticSearch #graphql #React